Warum schreibt meine Apache Beam Dataflow-Pipeline nicht in BigQuery?
Posted: 24 Dec 2024, 09:43
Ich arbeite an einer Apache Beam-Pipeline, die Daten verarbeitet und in BigQuery schreibt. Die Pipeline funktioniert einwandfrei, wenn ich DirectRunner verwende, aber wenn ich zum DataflowRunner wechsle, wird sie ohne Fehler oder Warnungen abgeschlossen, fügt aber keine Zeilen in BigQuery ein. Außerdem sehe ich große übrig gebliebene Dateien im temporären Verzeichnis meines Cloud Storage-Buckets (gs://my-bucket/temp/bq_load/...), und in der Zieltabelle werden keine Daten angezeigt.
Hier ist die Pipeline-Struktur:
worker_options.sdk_container_image = '...'
with beam.Pipeline(options=pipeline_options) as p:
processed_data = (
p
| "ReadFiles" >> beam.Create(FILE_LIST)
| "ProcessFiles" >> beam.ParDo(ProcessAvroFileDoFn())
| "WriteToBigQuery" >> beam.io.WriteToBigQuery(
table=f"{PROJECT_ID}:{DATASET_ID}.{TABLE_ID}",
schema=BQ_SCHEMA,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED
)
)
Wichtige Beobachtungen:
Hier ist die Pipeline-Struktur:
worker_options.sdk_container_image = '...'
with beam.Pipeline(options=pipeline_options) as p:
processed_data = (
p
| "ReadFiles" >> beam.Create(FILE_LIST)
| "ProcessFiles" >> beam.ParDo(ProcessAvroFileDoFn())
| "WriteToBigQuery" >> beam.io.WriteToBigQuery(
table=f"{PROJECT_ID}:{DATASET_ID}.{TABLE_ID}",
schema=BQ_SCHEMA,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED
)
)
Wichtige Beobachtungen:
- Die Pipeline ist mit DirectRunner erfolgreich und schreibt Daten ohne Probleme in BigQuery.
Mit dem DataflowRunner wird die Pipeline ohne Fehler oder Warnungen abgeschlossen, aber: Es werden keine Zeilen in BigQuery geschrieben und große temporäre Dateien verbleiben im Bucket (z. B. bq_load/...). - Die verarbeiteten Daten sind gültiges NDJSON.
- Das BigQuery-Schema entspricht der Datenstruktur.
- Bei der Überprüfung der verbleibenden temporären Dateien habe ich die temporäre Datei heruntergeladen und überprüft, ob sie gültige NDJSON-Zeilen enthält. Das manuelle Hochladen dieser Datei in BigQuery mit dem bq-Load-Befehl funktioniert einwandfrei.
- Testen mit anderen Datensätzen:
Ich habe viele verschiedene Eingaben ausprobiert , aber das Problem besteht weiterhin. - Dataflow-Protokolle prüfen:
Ich habe mir die Protokolle in der Dataflow-Überwachungskonsole angesehen, aber keine Fehler gefunden oder Warnungen. - Anderes Dienstkonto: Ein Dienstkonto mit unzureichenden Datenflussberechtigungen löst einen Fehler aus. Es scheint daher unwahrscheinlich, dass das Problem in den Berechtigungen für die Arbeiter liegt.