Airflow BashOperator OSError: [Errno 2] Keine solche Datei oder kein solches VerzeichnisPython

Python-Programme
Guest
 Airflow BashOperator OSError: [Errno 2] Keine solche Datei oder kein solches Verzeichnis

Post by Guest »

Ich erhalte immer wieder die gleiche Fehlermeldung von einem geplanten BashOperator, der gerade eine Auffüllung durchführt (es liegt über einen Monat „im Rückstand“).

Code: Select all

[2018-06-10 22:06:33,558] {base_task_runner.py:115} INFO - Running: ['bash', '-c', u'airflow run dag_name task_name 2018-03-14T00:00:00 --job_id 50 --raw -sd DAGS_FOLDER/dag_file.py']
Traceback (most recent call last):
File "/anaconda/bin//airflow", line 27, in 
args.func(args)
File "/anaconda/lib/python2.7/site-packages/airflow/bin/cli.py", line 387, in run
run_job.run()
File "/anaconda/lib/python2.7/site-packages/airflow/jobs.py", line 198, in run
self._execute()
File "/anaconda/lib/python2.7/site-packages/airflow/jobs.py", line 2512, in _execute
self.task_runner.start()
File "/anaconda/lib/python2.7/site-packages/airflow/task_runner/bash_task_runner.py", line 29, in start
self.process = self.run_command(['bash', '-c'], join_args=True)
File "/anaconda/lib/python2.7/site-packages/airflow/task_runner/base_task_runner.py", line 120, in run_command
universal_newlines=True
File "/anaconda/lib/python2.7/subprocess.py", line 394, in __init__
errread, errwrite)
File "/anaconda/lib/python2.7/subprocess.py", line 1047, in _execute_child
raise child_exception
OSError: [Errno 2] No such file or directory
[2018-06-10 22:06:33,633] {sequential_executor.py:47} ERROR - Failed to execute task Command 'airflow run dag_name task_name 2018-03-14T00:00:00 --local -sd /var/lib/airflow/dags/dag_file.py' returned non-zero exit status 1.
Ich erinnere mich, dass ich etwas gesehen habe, das darauf hindeutet, dass es sich möglicherweise um ein Berechtigungsproblem handelt, aber ich kann nicht herausfinden, welche Berechtigungen beteiligt sein könnten.

Ich verwende eine systemd-Konfiguration – und am Ende meiner Weisheit habe ich beschlossen, den Airflow-Webserver und den Scheduler als Root auszuführen.

Ich kann die Liste in der ersten Zeile nehmen und sie wörtlich in einer ipython-Shell als Argumente für eine subprocess.Popen-Instanz eingeben (wie es in airflow/task_runner/base_task_runner ist). .py; save no envs) und es wird nicht nur ausgeführt, sondern informiert die Airflow-Datenbank korrekt darüber, dass die Aufgabe abgeschlossen ist. Ich kann dies als Benutzer Airflow, Root oder Ubuntu tun.

Ich habe /anaconda/bin zum PATH in .bashrc für Airflow, Root, Ubuntu und /etc/bash.bashrc zusätzlich zum Wert für AIRFLOW_HOME, der sich auch in meiner Umgebungsdatei /etc/airflow befindet.

So sieht mein Systemd-Eintrag aus:

Code: Select all

[Unit]
Description=Airflow scheduler daemon
After=network.target postgresql.service mysql.service redis.service rabbitmq-server.service
Wants=postgresql.service mysql.service redis.service rabbitmq-server.service

[Service]
EnvironmentFile=/etc/airflow
User=root
Group=root
Type=simple
ExecStart=/anaconda/bin/airflow scheduler
Restart=always
RestartSec=5s

[Install]
WantedBy=multi-user.target
Meine env-Datei:

Code: Select all

PATH=$PATH:/anaconda/bin/
AIRFLOW_HOME=/var/lib/airflow
AIRFLOW_CONFIG=$AIRFLOW_HOME/airflow.cfg
Verwende Apache-Airflow==1.9.0 und suche verzweifelt nach einer Lösung. Vielen Dank im Voraus.

Airflow.cfg:

Code: Select all

[core]
airflow_home = /var/lib/airflow
dags_folder = /var/lib/airflow/dags
base_log_folder = /var/lib/airflow/logs
remote_log_conn_id =
encrypt_s3_logs = False
logging_level = INFO
logging_config_class =
log_format = [%%(asctime)s] {%%(filename)s:%%(lineno)d} %%(levelname)s - %%(message)s
simple_log_format = %%(asctime)s %%(levelname)s - %%(message)s
executor = SequentialExecutor
sql_alchemy_conn = {actual value hidden}
sql_alchemy_pool_size = 5
sql_alchemy_pool_recycle = 3600
parallelism = 4
dag_concurrency = 2
dags_are_paused_at_creation = True
non_pooled_task_slot_count = 16
max_active_runs_per_dag = 1
load_examples = False
plugins_folder = /var/lib/airflow/plugins
fernet_key = {actual value hidden}
donot_pickle = False
dagbag_import_timeout = 30
task_runner = BashTaskRunner
default_impersonation =
security =
unit_test_mode = False
task_log_reader = file.task
enable_xcom_pickling = True
killed_task_cleanup_time = 60
[cli]
api_client = airflow.api.client.local_client
endpoint_url = http://localhost:8080
[api]
auth_backend = airflow.api.auth.backend.default
[operators]
default_owner = root
default_cpus = 1
default_ram = 512
default_disk = 512
default_gpus = 0
[webserver]
base_url = http://localhost:8080
web_server_host = 0.0.0.0
web_server_port = 8080
web_server_ssl_cert =
web_server_ssl_key =
web_server_worker_timeout = 120
worker_refresh_batch_size = 1
worker_refresh_interval = 60
secret_key = temporary_key
workers = 1
worker_class = sync
access_logfile = -
error_logfile = -
expose_config = False
authenticate = False
filter_by_owner = False
owner_mode = user
dag_default_view = tree
dag_orientation = LR
demo_mode = False
log_fetch_timeout_sec = 5
hide_paused_dags_by_default = False
page_size = 100
[email]
email_backend = airflow.utils.email.send_email_smtp
[smtp]
smtp_host = localhost
smtp_starttls = True
smtp_ssl = False
smtp_port = 25
smtp_mail_from = [email protected]
[celery]
...
[dask]
cluster_address = 127.0.0.1:8786
[scheduler]
job_heartbeat_sec = 120
scheduler_heartbeat_sec = 120
run_duration = -1
min_file_process_interval = 0
dag_dir_list_interval = 300
print_stats_interval = 300
child_process_log_directory = /var/lib/airflow/logs/scheduler
scheduler_zombie_task_threshold = 900
catchup_by_default = True
max_tis_per_query = 0
statsd_on = False
statsd_host = localhost
statsd_port = 8125
statsd_prefix = airflow
max_threads = 1
authenticate = False
[ldap]
...
[mesos]
...
[kerberos]
...
[github_enterprise]
...
[admin]
hide_sensitive_variable_fields = True
Ls -hal hinzufügen

Code: Select all

root@ubuntu:/var/lib/airflow# ls -hal /var
total 52K
drwxr-xr-x 13 root root   4.0K Jun  3 11:58 .
root@ubuntu:/var/lib/airflow# ls -hal /var/lib
total 164K
drwxr-xr-x 42 root     root     4.0K Jun 10 19:00 .
root@ubuntu:/var/lib/airflow# ls -hal
total 40K
drwxr-xr-x  4 airflow airflow 4.0K Jun 11 06:41 .
drwxr-xr-x 42 root    root    4.0K Jun 10 19:00 ..
-rw-r--r--  1 airflow airflow  13K Jun 11 06:41 airflow.cfg
-rw-r--r--  1 airflow airflow  579 Jun 10 19:00 airflow.conf
drwxr-xr-x  2 airflow airflow 4.0K Jun 10 21:27 dags
drwxr-xr-x  4 airflow airflow 4.0K Jun 10 20:31 logs
-rw-r--r--  1 airflow airflow 1.7K Jun 10 19:00 unittests.cfg
root@ubuntu:/var/lib/airflow# ls -hal dags/
total 16K
drwxr-xr-x 2 airflow airflow 4.0K Jun 10 21:27 .
drwxr-xr-x 4 airflow airflow 4.0K Jun 11 06:41 ..
-rw-r--r-- 1 airflow airflow 3.4K Jun 10 21:26 dag_file.py
-rw-r--r-- 1 airflow airflow 1.7K Jun 10 21:27 dag_file.pyc
und Inhalt von dag_file.py:

Code: Select all

import airflow
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'root',
'run_as': 'root',
'depends_on_past': True,
'start_date': datetime(2018, 2, 20),
'email': ['[email protected]'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
'end_date': datetime(2018, 11, 15),
}
env = {
'PSQL': '{obscured}',
'PATH':  '/anaconda/bin/:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/usr/games:/usr/local/games:/snap/bin',
'PWD': '/home/ubuntu/{obs1}/',
'HOME': '/home/ubuntu',
'PYTHONPATH': '/home/ubuntu/{obs1}',
}
dag = DAG(
'dag_name',
default_args=default_args,
description='',
schedule_interval=timedelta(days=1))
t1 = BashOperator(
env=env,
task_id='dag_file',
bash_command='export PYTHONPATH=/home/ubuntu/{obs1} && /anaconda/bin/ipython $PYTHONPATH/{obs2}/{obs3}.py {{ ds }}',
dag=dag)
Und ich erinnere Sie daran, dass dies korrekt als Airflow, Root und Ubuntu ausgeführt wird: airflow run dag_name dag_file 2018-03-17T00:00:00 --job_id 55 -- raw -sd DAGS_FOLDER/dag_file.py

Quick Reply

Change Text Case: 
   
  • Similar Topics
    Replies
    Views
    Last post