Ist es möglich, Daten von Spark Executors in Java Spark zu schreiben?
Posted: 27 Mar 2025, 15:45
Ich habe eine Java -Spark -Anwendung, die Daten von Kafka erhält, einige Arbeiten an den Daten ausführt und dann mit dem Befehl toundswrite () Parquetdateien in S3 speichert. Bis zu diesem Zeitpunkt spart meine App alle empfangenen Daten in Spark -Treiber und speicherte dann Daten mit der aktuellen Spark -Sitzung. Was gut funktioniert.
Verarbeitungsklasse
DummyClass -Klasse
Spark Utility Class
Klasse als SO ändern:
zu sagen, dass Daten mit .Write () gespeichert werden sollen? Und verschiedene Versuche, die Sitzung zu holen, führt zu dem gleichen Fehler.
Code: Select all
public static void main(String[] args) throws Exception {
... // setting configs
Processing pr = new Processing(...); // initialising all the classes here
pr.run();
}
Code: Select all
private DummyClass dummyClass;
// constructor is here
public void run() {
... // some work and fetching data
Dataset myData = ... // selecting and preparing the data
myData.collectAsList().forEach(row -> {
String myField = row.getAs("colName")
... //some more work
dummyClass.createParquet(myField);
});
}
Code: Select all
private SparkUtility sparkUtility;
// constructor here
public void createParquet(String myField) {
List rowVals = new ArrayList();
StructType schema = createSchema();
...// some work to populate rowVals list
String s3Path = "s3a://bucket/key/key/";
sparkUtility.writeParquet(rowVals,schema,s3Path);
}
private StructType createSchema() {
StructType structType = new StructType();
structType = structType.add("col1", DataTypes.StringType, false);
structType = structType.add("col1w", DataTypes.StringType, false);
return structType;
}
Code: Select all
private SparkSession session;
// constructor here
private SparkSession getSparkSession() {
SparkConf sparkConf = new SparkConf()
.setAppName("myName")
// further settings here
.set("fs.s3a.endpoint", "s3-us-east-1.amazonaws.com");
return SparkSession.builder().config(sparkConf).getOrCreate();
}
public void writeParquet(List entries, StructType structType,String path) {
session.createDataFrame(entries,structType)
.write().mode("overwrite").format("parquet").save(path);
}
< /code>
Dies funktioniert und es ist in Ordnung. [url=viewtopic.php?t=14917]Ich möchte[/url] jetzt jedoch die Verarbeitung
Code: Select all
// constructor is here
public void run() {
... // some work and fetching data
Dataset myData = ... // selecting and preparing the data
kafkaDF.foreachPartition(partition -> {
DummyClass dummy = new DummyClass(...); // initialising classes in executors
partition.forEachRemaining(record -> {
String myField = row.getAs("colName");
... //some more work
dummyClass.createParquet(myField);
});
});
< /code>
Der Rest des Codes ist derzeit unverändert. Der Code wird gut ausgeführt, kann jedoch Daten speichern und die folgende Ausnahme ausgelegt: < /p>
Cannot invoke "scala.Option.map(scala.Function1)" because the return value of "org.apache.spark.sql.SparkSession.parentSessionState()" is null
< /code>
Aus dem, was ich verstehe, liegt das daran, dass ich versuche, die Spark Session in Executors zu verwenden. Wie kann ich den Datensatz in Parquet umwandeln und in S3 speichern? Wenn es eine Möglichkeit gibt, auf Sitzung zuzugreifen und sie mit der Methode von .Write ()