The node relationships in the graph are as follows:
enter image description here
- Parallel operation of nodes b and c
- No blocking in nodes b and b2
- Node C has sleep
[*] Warum sollte B2 nach c? < /li>
Warum kann B 2 nach der Ausführung nicht ausgeführt werden? /> Langchain: 0.3.27 < /li>
Langchain-Core: 0.3.75 < /li>
Blanggraph:0.6.6
Langgraph-PRAPHK: 0.6.4 < /li>
Langgraph-Sdk: 0.2 /li>
Langgraph-Sdk: 0.2 /li>
Langgraph-SDK: 0.2 /li>
Langgraph-praphen: 0.2 /li>
LangGraph-Checkpoint: 2.1.1 < /li>
< /ul>
Code < /H3>
import operator
from typing import Annotated, Any
from typing_extensions import TypedDict
from langgraph.graph import StateGraph, START, END
from langgraph.types import Send
import threading
import time
import asyncio
class State(TypedDict):
# The operator.add reducer fn makes this append-only
aggregate: Annotated[list, operator.add]
async def a(state: State):
print(f'====== thread_id: {threading.get_ident()} time:{time.strftime("%H:%M:%S", time.localtime())} Adding "A" to {state["aggregate"]}')
return {"aggregate": ["A"]}
async def b(state: State):
#time.sleep(3)
print(f'====== thread_id: {threading.get_ident()} time:{time.strftime("%H:%M:%S", time.localtime())} Adding "B" to {state["aggregate"]}')
return {"aggregate": ["B"]}
async def b_2(state: State):
print(f'====== thread_id: {threading.get_ident()} time:{time.strftime("%H:%M:%S", time.localtime())} Adding "B_2" to {state["aggregate"]}')
return {"aggregate": ["B_2"]}
async def c(state: State):
await asyncio.sleep(3) # sleep operation
print(f'====== thread_id: {threading.get_ident()} time:{time.strftime("%H:%M:%S", time.localtime())} Adding "C" to {state["aggregate"]}')
return {"aggregate": ["C"]}
async def d(state: State):
print(f'====== thread_id: {threading.get_ident()} time:{time.strftime("%H:%M:%S", time.localtime())} Adding "D" to {state["aggregate"]}')
return {"aggregate": ["D"]}
async def condition_a2bc(state: State):
return [Send("b", state), Send("c", state)]
builder = StateGraph(State)
builder.add_node(a)
builder.add_node(b)
builder.add_node(b_2)
builder.add_node(c)
builder.add_node(d, defer=True) # defer=True
#builder.add_node(d)
builder.set_entry_point("a")
builder.add_conditional_edges("a", condition_a2bc, ["b", "c"])
builder.add_edge("b", "b_2")
builder.add_edge("b_2", "d")
builder.add_edge("c", "d")
#builder.add_edge(["b_2", "c"], "d")
builder.add_edge("d", END)
#builder.add_edge("b_2", END)
#builder.add_edge("c", END)
graph = builder.compile()
async def main():
async for msg, metadata in graph.astream({"aggregate": []}, stream_mode="messages"):
print(msg)
print(metadata)
if __name__ == "__main__":
asyncio.run(main())
< /code>
Hinweis < /h3>
Ich habe die Operationen in den Kommentaren ausprobiert. /> Alle Drucke sind gleich < /strong> < /p>
Ausgabe < /h3>
====== thread_id: 8715454528 time:21:47:56 Adding "A" to []
====== thread_id: 8715454528 time:21:47:56 Adding "B" to ['A']
====== thread_id: 8715454528 time:21:47:59 Adding "C" to ['A']
====== thread_id: 8715454528 time:21:47:59 Adding "B_2" to ['A', 'B', 'C']
====== thread_id: 8715454528 time:21:47:59 Adding "D" to ['A', 'B', 'C', 'B_2']
< /code>
So erhalten Sie die korrekte Ausführungssequenz? /> < /ul>
Aber die Drucke sind gleich , alles falsch < /p>