Ich verwende ExternalTaskSensor und die Aufgabe stößt immer wieder aufPython

Python-Programme
Guest
 Ich verwende ExternalTaskSensor und die Aufgabe stößt immer wieder auf

Post by Guest »

Ich habe zwei DAGs, um die Verwendung von ExternalTaskSensor zu testen
Der ExternalTaskSensortask sagt jedoch ständig „Poking“ und wird nicht beendet.
Was könnte passieren?
Geben Sie hier die Bildbeschreibung ein
Das sind meine DAGs:

Code: Select all

from datetime import timedelta, datetime
import pendulum
from airflow.models import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.empty import EmptyOperator

dag_id = 'teste_dagA'

default_args = {
'owner': 'Engineering',
'retries': 2,
'retry_delay': timedelta(minutes=1),
'depends_on_past': False,
'email_on_failure': False,
'email_on_retry': False,
}

with DAG(
dag_id=dag_id,
tags=['teste', 'lakehouse', 'mediator'],
default_args=default_args,
description='teste',
schedule_interval=timedelta(minutes=3),
start_date=datetime(2025, 1, 15,0,0,0),
catchup=False,
) as dag:

empty_start_task = EmptyOperator(task_id='empty-start-task')

empty_end_task = BashOperator(
task_id='end',
bash_command='echo "Tarefa concluída com sucesso!"'
)

empty_start_task >> empty_end_task

Code: Select all

from datetime import timedelta, datetime
import pendulum
from airflow.models import DAG
from airflow.operators.empty import EmptyOperator
from airflow.sensors.external_task import ExternalTaskSensor

dag_id = 'teste_mediator'

default_args = {
'owner': 'Engineering',
'retries': 2,
'retry_delay': timedelta(minutes=1),
'depends_on_past': False,
'email_on_failure': False,
'email_on_retry': False,
}

with DAG(
dag_id=dag_id,
tags=['teste', 'lakehouse', 'mediator'],
default_args=default_args,
description='teste',
schedule_interval=timedelta(minutes=3),
start_date=datetime(2025, 1, 15,0,0,0),
catchup=False,
) as dag:

waiting = ExternalTaskSensor(
task_id='waiting',
external_dag_id='teste_dagA',
external_task_id='end',
execution_delta=timedelta(minutes=2)
)

empty_end_task = EmptyOperator(task_id='empty-end-task')

waiting >> empty_end_task

Wenn jemand das Problem versteht, bin ich dafür dankbar.
Ich habe bereits einige Änderungen mit Execution_delta vorgenommen, aber ohne Erfolg.Ich versuche die Lösung für mein Problem zu verstehen

Quick Reply

Change Text Case: 
   
  • Similar Topics
    Replies
    Views
    Last post