Wie verschiebt ich Dateien von einem Eimer in GCP von einem Eimer zu einem anderen, basierend auf Bedingungen mit Java?
Posted: 05 Mar 2025, 07:20
Ich versuche, eine CSV -Datei aus einem GCS -Bucket zu lesen, bestätigt, ob die Datei basierend auf einer Bedingung basiert, und schreibt dann eine gültige Datei in einen GCS -Ordner und eine ungültige Datei an eine andere. Aber was auch immer ich probiere, die Datei wird sowohl im Ordner gespeichert.
Kann mir jemand helfen, wie man damit geht?
Kann mir jemand helfen, wie man damit geht?
Code: Select all
PCollectionTuple results = pipeline
.apply("Read from GCS", TextIO.read().from(inputFile))
.apply("Validate Header and Trailer", ParDo.of(new DoFn() {
@ProcessElement
public void processElement(ProcessContext c, MultiOutputReceiver out) {
String line = c.element();
// Check for header or trailer
boolean isHeader = line.startsWith("Header,");
boolean isTrailer = line.startsWith("Trailer,");
if (isHeader || isTrailer || !line.trim().isEmpty()) {
out.get(validTag).output(line);
} else {
out.get(invalidTag).output(line);
}
}
}).withOutputTags(validTag, TupleTagList.of(invalidTag)));
results.get(validTag)
.apply("Write Valid to GCS", TextIO.write().to(outputFile).withoutSharding());
results.get(invalidTag)
.apply("Write Invalid to GCS", TextIO.write().to(errorFile).withoutSharding());