Kann mir jemand helfen, wie man damit geht?
Code: Select all
PCollectionTuple results = pipeline
.apply("Read from GCS", TextIO.read().from(inputFile))
.apply("Validate Header and Trailer", ParDo.of(new DoFn() {
@ProcessElement
public void processElement(ProcessContext c, MultiOutputReceiver out) {
String line = c.element();
// Check for header or trailer
boolean isHeader = line.startsWith("Header,");
boolean isTrailer = line.startsWith("Trailer,");
if (isHeader || isTrailer || !line.trim().isEmpty()) {
out.get(validTag).output(line);
} else {
out.get(invalidTag).output(line);
}
}
}).withOutputTags(validTag, TupleTagList.of(invalidTag)));
results.get(validTag)
.apply("Write Valid to GCS", TextIO.write().to(outputFile).withoutSharding());
results.get(invalidTag)
.apply("Write Invalid to GCS", TextIO.write().to(errorFile).withoutSharding());