Hier ist die Pipeline-Struktur:
worker_options.sdk_container_image = '...'
with beam.Pipeline(options=pipeline_options) as p:
processed_data = (
p
| "ReadFiles" >> beam.Create(FILE_LIST)
| "ProcessFiles" >> beam.ParDo(ProcessAvroFileDoFn())
| "WriteToBigQuery" >> beam.io.WriteToBigQuery(
table=f"{PROJECT_ID}:{DATASET_ID}.{TABLE_ID}",
schema=BQ_SCHEMA,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED
)
)
Wichtige Beobachtungen:
- Die Pipeline ist mit DirectRunner erfolgreich und schreibt Daten ohne Probleme in BigQuery.
Mit dem DataflowRunner wird die Pipeline ohne Fehler oder Warnungen abgeschlossen, aber: Es werden keine Zeilen in BigQuery geschrieben und große temporäre Dateien verbleiben im Bucket (z. B. bq_load/...). - Die verarbeiteten Daten sind gültiges NDJSON.
- Das BigQuery-Schema entspricht der Datenstruktur.
- Bei der Überprüfung der verbleibenden temporären Dateien habe ich die temporäre Datei heruntergeladen und überprüft, ob sie gültige NDJSON-Zeilen enthält. Das manuelle Hochladen dieser Datei in BigQuery mit dem bq-Load-Befehl funktioniert einwandfrei.
- Testen mit anderen Datensätzen:
Ich habe viele verschiedene Eingaben ausprobiert , aber das Problem besteht weiterhin. - Dataflow-Protokolle prüfen:
Ich habe mir die Protokolle in der Dataflow-Überwachungskonsole angesehen, aber keine Fehler gefunden oder Warnungen. - Anderes Dienstkonto: Ein Dienstkonto mit unzureichenden Datenflussberechtigungen löst einen Fehler aus. Es scheint daher unwahrscheinlich, dass das Problem in den Berechtigungen für die Arbeiter liegt.