Es kann nicht als Stream Token für Token geantwortet werden, wenn ein Agent Teil eines Workflows in Langgraph istPython

Python-Programme
Anonymous
 Es kann nicht als Stream Token für Token geantwortet werden, wenn ein Agent Teil eines Workflows in Langgraph ist

Post by Anonymous »

Ich entwerfe einen komplexen Workflow mit mehreren Agenten. Einige der Agenten werden als Vermittler fungieren und andere werden dem Benutzer zugewandt sein (das heißt, ich zeige dem Benutzer die Ausgabe dieser Agenten). Da mehrere Agenten beteiligt sind, muss ich einen Workflow verwenden.
Aber das Problem ist, wenn ich einen Agenten in einem Workflow hinzufüge (

Code: Select all

StateGraph
), verliert es seine Fähigkeit, Token für Token zu streamen. Der Einfachheit halber zeige ich dieses Verhalten mit einem einzelnen Agentenknoten in einem Workflow:

Code: Select all

from langgraph.graph import MessagesState
from langchain_openai import ChatOpenAI
from langgraph.graph import StateGraph, START
from langgraph.checkpoint.memory import MemorySaver
from langchain.agents import create_agent
from dotenv import load_dotenv

load_dotenv()
model = ChatOpenAI(model="gpt-4o-mini", temperature=0.1, max_tokens=1000)

agent = create_agent(
model=model,
tools=[],
)

def agent_node(state: MessagesState):
return agent.invoke({"messages": state["messages"]})

checkpointer = MemorySaver()
workflow = StateGraph(MessagesState)
workflow.add_node("agent", agent_node) # Adding a node containing agent
# workflow.add_node("agent", agent) # Or, we could have also added the agent directly, but it will not help in token-by-token streaming
workflow.add_edge(START, "agent")
compiled_workflow = workflow.compile(checkpointer=checkpointer)

def response(user_input, agent_or_workflow, config):
user_input = {"messages": [("human", user_input)]}
for mode_chunk in agent_or_workflow.stream(user_input, config, stream_mode=["messages"]):
mode, chunk = mode_chunk

if mode == "messages":
token, metadata = chunk
if token.content:
print(token.content, end="\n", flush=True)

if __name__ == "__main__":
config = {"configurable": {"thread_id": "chat-1"}}

# The following will stream token-by-token
response("Hi", agent, config)
print("-" * 30)
# The following will NOT stream token-by-token
response("Hi", compiled_workflow, config)
Hier sehen Sie: Wenn ich den Agenten als eigenständige Entität aufrufe, kann er Token für Token gut streamen. Aber sobald es Teil eines Workflows ist, kann es das nicht mehr. Da ich den Agenten mit einem Knoten agent_node kapsele, dachte ich zunächst, dass dies den Schritt möglicherweise zu einem blockierenden Vorgang machen würde, also habe ich versucht, agent direkt als Knoten hinzuzufügen. Aber das hat auch nicht geholfen (siehe die kommentierte Zeile im Workflow-Erstellungsprozess.
Wie löse ich dieses Problem?
In meinem Anwendungsfall wird dieser Workflow das Backend eines Voice-Bots sein. Daher ist der Durchsatz entscheidend. Und ich kümmere mich nur um das Streaming der Knoten, die dem Benutzer zugewandt sind, d. h. die Ausgabe soll an den Benutzer gesendet werden. Jeder Zwischenknoten, bei dem die Ausgabe nicht für den Benutzer sichtbar oder gehört werden soll – I Für sie ist es egal, ob sie streamen oder nicht, denn für sie müssen wir sowieso die Gesamtausgabe erhalten, damit wir mit dem nächsten Schritt im Workflow fortfahren können.
Hinweis:
  • Ich brauche Echtzeit-Streaming, nicht nur tokenweise Schleife über die bereits erhaltene vollständige Nachricht.
  • Ich möchte Langgraphs integrierte Methode create_agent verwenden und NICHT nachahmen Der Agent mit llm mit bind_tools, und Streaming funktioniert dafür. Ich glaube jedoch, dass ich die integrierten Funktionen (z. B. Middleware) verliere, die in Agenten enthalten sind, die mit der integrierten Methode „create_agent“ erstellt wurden.

Quick Reply

Change Text Case: 
   
  • Similar Topics
    Replies
    Views
    Last post