Ich möchte einen Datenflussstrahljob mit Airflow ausführen, der auf Cloud Composer (GCP) gehostet wird.from airflow import models
from airflow.providers.apache.beam.operators.beam import BeamRunPythonPipelineOperator
from airflow.utils.dates import days_ago
import logging
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
DAG_ID = "dataflow_beam_test"
PROJECT_ID = "xxxxxx" # Replace with your project ID
GCS_BUCKET = "beam-test-extract" # Replace with your bucket
REGION = "europe-west8" # Replace with your region
PY_FILE = f"gs://{GCS_BUCKET}/dataflow-jobs/airflow_beam_test.py" # Updated to use the new pipeline file
default_args = {
"owner": "Airflow",
"start_date": days_ago(1),
"project_id": PROJECT_ID,
}
with models.DAG(
DAG_ID,
default_args=default_args,
schedule_interval=None, # Set your desired schedule
tags=["dataflow"],
) as dag:
run_python_job = BeamRunPythonPipelineOperator(
task_id="task_beam_plan",
runner="DataflowRunner",
py_file=PY_FILE,
py_options=[
"--temp_location", f"gs://{GCS_BUCKET}/tmp",
"--staging_location", f"gs://{GCS_BUCKET}/staging",
"--save_main_session", # This helps with pickling issues
],
dataflow_config={
"location": REGION,
"job_name": "test_beam_plan",
"project_id": PROJECT_ID,
"gcp_conn_id": "google_cloud_default",
},
)
< /code>
und die Strahlpipeline selbst als < /p>
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
def run_pipeline():
# Define the pipeline options
options = PipelineOptions(
runner='DataflowRunner',
project='xxxxx',
temp_location='gs://beam-test-extract/tmp',
region='europe-west8',
)
# Create the pipeline
with beam.Pipeline(options=options) as p:
# Create a simple PCollection with three rows
data = [
{'id': 1, 'name': 'John', 'age': 30},
{'id': 2, 'name': 'Alice', 'age': 25},
{'id': 3, 'name': 'Bob', 'age': 35}
]
# Create the pipeline
(p
| 'Create Data' >> beam.Create(data)
| 'Write to File' >> beam.io.WriteToText(
'gs://beam-test-extract/output/simple_output',
file_name_suffix='.txt'
)
)
if __name__ == '__main__':
run_pipeline()
< /code>
Ich erhalte jedoch weiterhin diesen obskuren Fehler, wenn ich die Aufgabenprotokolle in Luftstrom inspiziere. < /p>
airflow.exceptions.AirflowException: Apache Beam process failed with return code 2
< /code>
Aus Gründen der Vollständigkeit ist dies das vollständige Protokoll < /p>
[2025-05-21, 10:48:27 UTC] {gcs.py:359} INFO - File downloaded to /tmp/tmppzzv0o4_airflow_beam_test.py
[2025-05-21, 10:48:27 UTC] {credentials_provider.py:410} INFO - Getting connection using `google.auth.default()` since no explicit credentials are provided.
[2025-05-21, 10:48:33 UTC] {beam.py:312} INFO - Beam version: 2.64.0
[2025-05-21, 10:48:33 UTC] {beam.py:171} INFO - Running command: python3 --temp_location gs://beam-test-extract/tmp --staging_location gs://beam-test-extract/staging --save_main_session /tmp/tmppzzv0o4_airflow_beam_test.py --runner=DataflowRunner --job_name=test-beam-plan-f8da48d8 --project=xfarm-239313 --region=europe-west8 --labels=airflow-version=v2-10-5-composer
[2025-05-21, 10:48:33 UTC] {beam.py:182} INFO - Start waiting for Apache Beam process to complete.
[2025-05-21, 10:48:33 UTC] {beam.py:150} WARNING - unknown option --temp_location
[2025-05-21, 10:48:33 UTC] {beam.py:150} WARNING - usage: python3 [option] ... [-c cmd | -m mod | file | -] [arg] ...
[2025-05-21, 10:48:33 UTC] {beam.py:150} WARNING - Try `python -h' for more information.
[2025-05-21, 10:48:33 UTC] {beam.py:203} INFO - Process exited with return code: 2
[2025-05-21, 10:48:33 UTC] {taskinstance.py:3315} ERROR - Task failed with exception\nTraceback (most recent call last):\n File "/opt/python3.11/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 769, in _execute_task\n result = _execute_callable(context=context, **execute_callable_kwargs)\n ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\n File "/opt/python3.11/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 735, in _execute_callable\n return ExecutionCallableRunner(\n ^^^^^^^^^^^^^^^^^^^^^^^^\n File "/opt/python3.11/lib/python3.11/site-packages/airflow/utils/operator_helpers.py", line 252, in run\n return self.func(*args, **kwargs)\n ^^^^^^^^^^^^^^^^^^^^^^^^^^\n File "/opt/python3.11/lib/python3.11/site-packages/airflow/models/baseoperator.py", line 424, in wrapper\n return func(self, *args, **kwargs)\n ^^^^^^^^^^^^^^^^^^^^^^^^^^^\n File "/opt/python3.11/lib/python3.11/site-packages/airflow/providers/apache/beam/operators/beam.py", line 401, in execute\n return self.execute_on_dataflow(context)\n ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\n File "/opt/python3.11/lib/python3.11/site-packages/airflow/providers/apache/beam/operators/beam.py", line 416, in execute_on_dataflow\n self.beam_hook.start_python_pipeline(\n File "/opt/python3.11/lib/python3.11/site-packages/airflow/providers/apache/beam/hooks/beam.py", line 319, in start_python_pipeline\n self._start_pipeline(\n File "/opt/python3.11/lib/python3.11/site-packages/airflow/providers/apache/beam/hooks/beam.py", line 237, in _start_pipeline\n run_beam_command(\n File "/opt/python3.11/lib/python3.11/site-packages/airflow/providers/apache/beam/hooks/beam.py", line 206, in run_beam_command\n raise AirflowException(f"Apache Beam process failed with return code {proc.returncode}")\nairflow.exceptions.AirflowException: Apache Beam process failed with return code 2
< /code>
Was ich weiß: < /strong> < /p>
[*] Der Strahljob läuft gut, wenn er lokal ausgelöst wird, und ich kann sehen, dass die Aufgabe erfolgreich erledigt ist. Code.
Das Cloud Composer Service -Konto verfügt über die folgenden Rollen: Speicheradministrator, DataFlow -Administrator, Komponistenarbeiter, Service -Konto -Benutzer.>
Der Luftstrom kann keine DataFlow Strahlaufgabe ausführen ⇐ Python
-
- Similar Topics
- Replies
- Views
- Last post