Aber das Problem ist, wenn ich einen Agenten in einem Workflow hinzufüge (
Code: Select all
StateGraphCode: Select all
from langgraph.graph import MessagesState
from langchain_openai import ChatOpenAI
from langgraph.graph import StateGraph, START
from langgraph.checkpoint.memory import MemorySaver
from langchain.agents import create_agent
from dotenv import load_dotenv
load_dotenv()
model = ChatOpenAI(model="gpt-4o-mini", temperature=0.1, max_tokens=1000)
agent = create_agent(
model=model,
tools=[],
)
def agent_node(state: MessagesState):
return agent.invoke({"messages": state["messages"]})
checkpointer = MemorySaver()
workflow = StateGraph(MessagesState)
workflow.add_node("agent", agent_node) # Adding a node containing agent
# workflow.add_node("agent", agent) # Or, we could have also added the agent directly, but it will not help in token-by-token streaming
workflow.add_edge(START, "agent")
compiled_workflow = workflow.compile(checkpointer=checkpointer)
def response(user_input, agent_or_workflow, config):
user_input = {"messages": [("human", user_input)]}
for mode_chunk in agent_or_workflow.stream(user_input, config, stream_mode=["messages"]):
mode, chunk = mode_chunk
if mode == "messages":
token, metadata = chunk
if token.content:
print(token.content, end="\n", flush=True)
if __name__ == "__main__":
config = {"configurable": {"thread_id": "chat-1"}}
# The following will stream token-by-token
response("Hi", agent, config)
print("-" * 30)
# The following will NOT stream token-by-token
response("Hi", compiled_workflow, config)
Wie löse ich dieses Problem?
In meinem Anwendungsfall wird dieser Workflow das Backend eines Voice-Bots sein. Daher ist der Durchsatz entscheidend. Und ich kümmere mich nur um das Streaming der Knoten, die dem Benutzer zugewandt sind, d. h. die Ausgabe soll an den Benutzer gesendet werden. Jeder Zwischenknoten, bei dem die Ausgabe nicht für den Benutzer sichtbar oder gehört werden soll – I Für sie ist es egal, ob sie streamen oder nicht, denn für sie müssen wir sowieso die Gesamtausgabe erhalten, damit wir mit dem nächsten Schritt im Workflow fortfahren können.
Hinweis:
- Ich brauche Echtzeit-Streaming, nicht nur tokenweise Schleife über die bereits erhaltene vollständige Nachricht.
- Ich möchte Langgraphs integrierte Methode create_agent verwenden und NICHT nachahmen Der Agent mit llm mit bind_tools, und Streaming funktioniert dafür. Ich glaube jedoch, dass ich die integrierten Funktionen (z. B. Middleware) verliere, die in Agenten enthalten sind, die mit der integrierten Methode „create_agent“ erstellt wurden.
Mobile version