Apache -Luftstrom: Ignorieren Sie die implizite Taskgroup beim Erstellen einer AufgabePython

Python-Programme
Anonymous
 Apache -Luftstrom: Ignorieren Sie die implizite Taskgroup beim Erstellen einer Aufgabe

Post by Anonymous »

Ich generiere dynamisch basierend auf JSON-Dateien einige DAGs. "Sub-Dag" (es ist technisch gesehen kein Subdagoperator , aber Sie erhalten die Idee) und erstellen Sie Aufgaben in dieser Unterdag. Ich fange auch jede implizite Taskgroup , die über meiner while -Schleife standen. Meine Aufgaben in der "unabhängigen" Unterdag erwarten also eine Gruppe, die in ihrer eigenen Dag nicht existiert, aber nur in der Hauptdag. Inside.

Code: Select all

            if task_group:
print(f"[parse_wrkflw] task_group={task_group}")
else:
print(f"[parse_wrkflw] task_group is None")
print(f"[parse_wrkflw] task_id={task_id}")

task = PythonOperator(
task_id=task_id,
python_callable=execute_sql,
op_args=[sql_file],
dag=dag,
task_group=task_group
)

print(f"[parse_wrkflw] task.task_id={task.task_id}")
< /code>
Ich habe: < /p>
INFO - [parse_wrkflw] task_group is None
INFO - [parse_wrkflw] task_id=_if_0_true_while_0_loop_content_call_sql_0_test_file
INFO - [parse_wrkflw] task.task_id=_if_0_true_tasks._if_0_true_while_0_loop_content_call_sql_0_test_file
… wo Sie _if_0_true_tasks die Taskgroup id sehen können.

Code: Select all

{
"wrkflw" : [ {
"typ" : "IF",
"el" : "cond1",
"children" : [ {
"typ" : "WHILE",
"el" : "cond2",
"children" : [ {
"typ" : "CALL SQL",
"el" : "test_file.sql"
} ]
} ]
} ]
}
… und bei der Verarbeitung des Typs "if" erstelle ich eine Taskgroup , um die Kinder dieses Blocks zu halten:

Code: Select all

with TaskGroup(group_id=f"{if_prefix}_true_tasks", task_group=task_group, dag=dag) as tg_true:
...
(Erstellen der DAG rekursiv durch Aufrufen der Funktion PARSE_WRKFLW für Kinder, die Aufgaben und Gruppen basierend auf der Typ der aktuellen Elemente erstellt)
Die Idee mit der while logic ist diese:

Code: Select all

{previous tasks}
>> Eintrag_task (

Code: Select all

EmptyOperator
) >> condition_task (

Code: Select all

BranchPythonOperator
prüft, ob die Bedingung erfüllt ist, und gibt den nächsten Task_id zurück, um zwischen trigger_loop_dag und end_loop ) >> [zu ausgeführt. [

Code: Select all

trigger_loop_dag
(

Code: Select all

TriggerDagRunOperator
mit wait_for_completion = true ) | end_loop (

Code: Select all

EmptyOperator
mit Trigger_Rule = TriggerRule.none_failed_min_one_success )]] und Trigger_loop_dag >> end_loop >> {nächste Aufgaben}

Code: Select all

trigger_loop_dag
ruft eine DAG auf, die für die Schleife bestimmt ist:

Code: Select all

{children tasks of the WHILE}
>> Loop_branch_task (

Code: Select all

BranchPythonOperator
prüft, ob die Bedingung erfüllt ist, und gibt den nächsten Task_id zurück, um zwischen Trigger_self und end_loop ) >> [zu ausgeführt. [

Code: Select all

trigger_self
(

Code: Select all

TriggerDagRunOperator
mit wait_for_completion = true nennt sich selbst) | end_loop (

Code: Select all

EmptyOperator
)]
Dies funktioniert im Grunde genommen vollkommen in Ordnung, wenn ich keine implizite Taskgroup über der Weile habe.>

Quick Reply

Change Text Case: 
   
  • Similar Topics
    Replies
    Views
    Last post