Code: Select all
def email_router(state: TypedDict) -> None:
node_results, state = get_emails(state)
if node_results == "New Email":
state["internal_state"] = "topics_router"
else:
state["errors"] = "No New Mail"
state["internal_state"] = "return_final_status"
def check_internal_state(state: TypedDict) -> str:
logging.debug(f"Internal State: {state["internal_state"]}")
return state["internal_state"]
Code: Select all
workflow.add_conditional_edges(
"email_router",
check_internal_state,
{
"topics_router": "topics_router",
"return_final_status": "return_final_status",
},
)
Der Code ist in Klassen festgelegt. die eine Funktion, die den Code kompiliert und die Ausgabe speichert als:
Code: Select all
global mailman
mailman = workflow.compile()
Code: Select all
workflow = StateGraph(GraphState)
workflow.add_node("email_router", email_router)
workflow.add_node("topics_router", topics_router)
workflow.add_node("status_router", status_router)
workflow.add_node("actions_router", actions_router)
workflow.add_node("return_final_status", return_final_status_node)
workflow.set_entry_point("email_router")
workflow.add_conditional_edges(
"email_router",
check_internal_state,
{
"topics_router": "topics_router",
"return_final_status": "return_final_status",
},
)
workflow.add_conditional_edges(
"topics_router",
check_internal_state,
{
"status_router": "status_router",
"return_final_status": "return_final_status",
},
)
workflow.add_conditional_edges(
"status_router",
check_internal_state,
{
"actions_router": "actions_router",
"return_final_status": "return_final_status",
},
)
workflow.add_edge("actions_router", "return_final_status")
workflow.add_edge("return_final_status", END)
Mobile version