Gelegentlich 'notimplementedError' im Luftstrom mit Dockeroperator

Post a reply

Smilies
:) :( :oops: :chelo: :roll: :wink: :muza: :sorry: :angel: :read: *x) :clever:
View more smilies

BBCode is ON
[img] is ON
[flash] is OFF
[url] is ON
Smilies are ON

Topic review
   

Expand view Topic review: Gelegentlich 'notimplementedError' im Luftstrom mit Dockeroperator

by Anonymous » 14 Feb 2025, 14:55

Ich verwende den Celeryexecutor und Dockeroperator , um meine Aufgaben im Luftstrom auszuführen. Die Docker -Container werden auf einem Dind -Dienst ausgeführt, der im selben K8s -Cluster wie Luftstrom ausgeführt wird. Mit unterschiedlichen Parametern. Der Fehler ist zufällig und nicht konsistent. Manchmal scheitern die meisten Aufgaben mit dem Fehler, manchmal nur wenige. Wenn Sie eine Aufgabe wiederholen, die genug Mal ausgefallen ist, gelingt dies. < /P>

Code: Select all

[2025-02-14T09:59:35.485+0000] {taskinstance.py:2193} INFO - Starting attempt 1 of 1
[2025-02-14T09:59:35.507+0000] {taskinstance.py:2217} INFO - Executing  on 2025-02-14 09:56:38.717801+00:00
[2025-02-14T09:59:35.514+0000] {standard_task_runner.py:60} INFO - Started process 439369 to run task
[2025-02-14T09:59:35.517+0000] {standard_task_runner.py:87} INFO - Running: ['***', 'tasks', 'run', 'my_dag', 'my_task', 'manual__2025-02-14T09:56:38.717801+00:00', '--job-id', '105389', '--raw', '--subdir', 'DAGS_FOLDER/dynamic_dag.py', '--cfg-path', '/tmp/tmpre_1wgu6']
[2025-02-14T09:59:35.519+0000] {standard_task_runner.py:88} INFO - Job 105389: Subtask my_task
[2025-02-14T09:59:36.007+0000] {taskinstance.py:2731} ERROR - Task failed with exception
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.9/site-packages/airflow/models/taskinstance.py", line 444, in _execute_task
result = _execute_callable(context=context, **execute_callable_kwargs)
File "/home/airflow/.local/lib/python3.9/site-packages/airflow/models/taskinstance.py", line 414, in _execute_callable
return execute_callable(context=context, **execute_callable_kwargs)
File "/home/airflow/.local/lib/python3.9/site-packages/airflow/models/baseoperator.py", line 1175, in execute
raise NotImplementedError()
NotImplementedError
[2025-02-14T09:59:36.073+0000] {standard_task_runner.py:107} ERROR - Failed to execute job 105389 for task my_task (; 439369)
[2025-02-14T09:59:36.091+0000] {local_task_job_runner.py:234} INFO - Task exited with return code 1
[2025-02-14T09:59:36.153+0000] {taskinstance.py:3312} INFO - 0 downstream tasks scheduled from follow-on schedule check
< /code>
Die zugehörige Sellerie -Aufgabe ist erfolgreich (basierend auf dem Blütenüberwachungswerkzeug), aber der Luftstrom erhöht die obige Ausnahme, und ich bemerke, dass am Dind -Service kein Container gestartet wurde. Wir sehen auch das folgende Protokoll vom Scheduler: < /p>
TaskInstance Finished: dag_id=my_dag, task_id=my_task, run_id=manual__2025-02-14T09:41:21.943302+00:00, map_index=-1, run_start_date=2025-02-14 09:41:26.398775+00:00, run_end_date=2025-02-14 09:41:27.013421+00:00, run_duration=0.614646, state=failed, executor_state=success, try_number=1, max_tries=0, job_id=105371, pool=default_pool, queue=default, priority_weight=10, operator=DockerOperator, queued_dttm=2025-02-14 09:41:23.067512+00:00, queued_by_job_id=97598, pid=438349
Ich kann erkennen, wann die Aufgabe fehlschlägt, da keine Protokolle von docker.py vorhanden sind, d. H. Das erste Protokoll zeigt an, dass der Container gestartet wird.

Code: Select all

[2025-02-14, 09:56:42 UTC] {docker.py:486} INFO - Pulling docker image example/image:1.2.3
< /code>
Beachten Sie, dass dieser Fehler zuvor bei einer anderen DAG aufgetreten ist, aber er scheint bei DAG X sehr häufig zu sein . < /p>

Update: Ich habe gerade bemerkt, dass eine Aufgabe als Python -Aufgabe definiert ist und nicht als Dockeroperator. Diese Aufgabe leidet auch unter dem zufälligen NotimplementedError 
. Ich vermute also, dass es nicht mit dem Bediener zusammenhängt. Das Gleiche tritt bei der Verwendung von Kubernetesexecutor auch auf.

Top