Warum wird mein Status in meinem LangGraph-Workflow nicht korrekt übergeben?Python

Python-Programme
Anonymous
 Warum wird mein Status in meinem LangGraph-Workflow nicht korrekt übergeben?

Post by Anonymous »

Ich habe eine einfache Reihe von Knoten, die mit bedingten Kanten miteinander verkettet sind. Die ersten beiden werden hier gezeigt:

Code: Select all

def email_router(state: TypedDict) -> None:
node_results, state = get_emails(state)
if node_results == "New Email":
state["internal_state"] = "topics_router"
else:
state["errors"] = "No New Mail"
state["internal_state"] = "return_final_status"

def check_internal_state(state: TypedDict) -> str:
logging.debug(f"Internal State: {state["internal_state"]}")
return state["internal_state"]
Im E-Mail-Router setze ich den Statuswert „internal_state“ entweder auf „topics_router“ oder „return_final_status“, was zwei weitere Knoten sind. Der check_internal_state dient lediglich dazu, die bedingte Kante zu aktivieren, die wie folgt aussieht:

Code: Select all

workflow.add_conditional_edges(
"email_router",
check_internal_state,
{
"topics_router": "topics_router",
"return_final_status": "return_final_status",
},
)
Die Protokollierungs-Debug-Anweisung im check_internal_state gibt ständig eine leere Zeichenfolge zurück, was bedeutet, dass ich die Statusinformationen nicht ordnungsgemäß übertrage und nicht herausfinden kann, was ich falsch mache.
Der Code ist in Klassen festgelegt. die eine Funktion, die den Code kompiliert und die Ausgabe speichert als:

Code: Select all

global mailman
mailman = workflow.compile()
Ich habe eine andere Funktion in derselben Python-Datei, die dann Mailman aufruft und Funktionen aufruft, die sich alle in einer anderen Python-Datei befinden, aber importiert werden. Ich spreche das nur an, weil vielleicht die Aufteilung in mehrere Funktionen das Problem verursacht? Hier ist der Gesamtablauf:

Code: Select all

workflow = StateGraph(GraphState)
workflow.add_node("email_router", email_router)
workflow.add_node("topics_router", topics_router)
workflow.add_node("status_router", status_router)
workflow.add_node("actions_router", actions_router)
workflow.add_node("return_final_status", return_final_status_node)

workflow.set_entry_point("email_router")

workflow.add_conditional_edges(
"email_router",
check_internal_state,
{
"topics_router": "topics_router",
"return_final_status": "return_final_status",
},
)

workflow.add_conditional_edges(
"topics_router",
check_internal_state,
{
"status_router": "status_router",
"return_final_status": "return_final_status",
},
)

workflow.add_conditional_edges(
"status_router",
check_internal_state,
{
"actions_router": "actions_router",
"return_final_status": "return_final_status",
},
)

workflow.add_edge("actions_router", "return_final_status")
workflow.add_edge("return_final_status", END)
Weiß jemand, was ich falsch mache? Danke!

Quick Reply

Change Text Case: 
   
  • Similar Topics
    Replies
    Views
    Last post