Code: Select all
with TaskGroup(group_id='full_data_refresh') as full_data_refresh:
for item in itertools.product(dag_config['src_config_id'],dag_config['ctry_config_id']):
config_run_ids = f'{item}'
exec_funs_mti_all_functions_run = PostgresOperator(
task_id= f'exec_funs_mti_all_functions_run{config_run_ids}',
postgres_conn_id='DDL_CONN_WRT_CSB',
sql=f'select funs_mti_all_functions_run{config_run_ids}';'
)
So, I want the task_id : f'exec_funs_mti_all_functions_run_{config_run_ids}' to run for every combination of src_config_id und ctry_config_id, das ist: < /p>
select funs_mti_all_functions_run('1','1');
select funs_mti_all_functions_run('1','2');
select funs_mti_all_functions_run('1','6');
select funs_mti_all_functions_run('1','10');
< /code>
Die Implementierung der Python -Datei für die DAG -Definition, obwohl Fehler mit der Nachricht ausgefallen ist:
"Airflow.exceptions.Airflowexception: The Key "exec_funs_mti_all_functions_run ('1', '1'). /> Kann hier jemand helfen? < /p>