Ich verwende ExternalTaskSensor und die Aufgabe stößt immer wieder auf
Posted: 17 Jan 2025, 06:13
Ich habe zwei DAGs, um die Verwendung von ExternalTaskSensor zu testen
Der ExternalTaskSensortask sagt jedoch ständig „Poking“ und wird nicht beendet.
Was könnte passieren?
Geben Sie hier die Bildbeschreibung ein
Das sind meine DAGs:
Wenn jemand das Problem versteht, bin ich dafür dankbar.
Ich habe bereits einige Änderungen mit Execution_delta vorgenommen, aber ohne Erfolg.Ich versuche die Lösung für mein Problem zu verstehen
Der ExternalTaskSensortask sagt jedoch ständig „Poking“ und wird nicht beendet.
Was könnte passieren?
Geben Sie hier die Bildbeschreibung ein
Das sind meine DAGs:
Code: Select all
from datetime import timedelta, datetime
import pendulum
from airflow.models import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.empty import EmptyOperator
dag_id = 'teste_dagA'
default_args = {
'owner': 'Engineering',
'retries': 2,
'retry_delay': timedelta(minutes=1),
'depends_on_past': False,
'email_on_failure': False,
'email_on_retry': False,
}
with DAG(
dag_id=dag_id,
tags=['teste', 'lakehouse', 'mediator'],
default_args=default_args,
description='teste',
schedule_interval=timedelta(minutes=3),
start_date=datetime(2025, 1, 15,0,0,0),
catchup=False,
) as dag:
empty_start_task = EmptyOperator(task_id='empty-start-task')
empty_end_task = BashOperator(
task_id='end',
bash_command='echo "Tarefa concluída com sucesso!"'
)
empty_start_task >> empty_end_task
Code: Select all
from datetime import timedelta, datetime
import pendulum
from airflow.models import DAG
from airflow.operators.empty import EmptyOperator
from airflow.sensors.external_task import ExternalTaskSensor
dag_id = 'teste_mediator'
default_args = {
'owner': 'Engineering',
'retries': 2,
'retry_delay': timedelta(minutes=1),
'depends_on_past': False,
'email_on_failure': False,
'email_on_retry': False,
}
with DAG(
dag_id=dag_id,
tags=['teste', 'lakehouse', 'mediator'],
default_args=default_args,
description='teste',
schedule_interval=timedelta(minutes=3),
start_date=datetime(2025, 1, 15,0,0,0),
catchup=False,
) as dag:
waiting = ExternalTaskSensor(
task_id='waiting',
external_dag_id='teste_dagA',
external_task_id='end',
execution_delta=timedelta(minutes=2)
)
empty_end_task = EmptyOperator(task_id='empty-end-task')
waiting >> empty_end_task
Ich habe bereits einige Änderungen mit Execution_delta vorgenommen, aber ohne Erfolg.Ich versuche die Lösung für mein Problem zu verstehen