Ist es möglich, Daten von Spark Executors in Java Spark zu schreiben?Java

Java-Forum
Anonymous
 Ist es möglich, Daten von Spark Executors in Java Spark zu schreiben?

Post by Anonymous »

Ich habe eine Java -Spark -Anwendung, die Daten von Kafka erhält, einige Arbeiten an den Daten ausführt und dann mit dem Befehl toundswrite () Parquetdateien in S3 speichert. Bis zu diesem Zeitpunkt spart meine App alle empfangenen Daten in Spark -Treiber und speicherte dann Daten mit der aktuellen Spark -Sitzung. Was gut funktioniert.

Code: Select all

public static void main(String[] args) throws Exception {
... // setting configs
Processing pr = new Processing(...); // initialising all the classes here
pr.run();
}
Verarbeitungsklasse

Code: Select all

private DummyClass dummyClass;

// constructor is here
public void run() {
... // some work and fetching data
Dataset myData = ... // selecting and preparing the data
myData.collectAsList().forEach(row -> {
String myField = row.getAs("colName")
... //some more work
dummyClass.createParquet(myField);
});
}
DummyClass -Klasse

Code: Select all

private SparkUtility sparkUtility;

// constructor here

public void createParquet(String myField) {
List rowVals = new ArrayList();
StructType schema = createSchema();
...// some work to populate rowVals list
String s3Path = "s3a://bucket/key/key/";
sparkUtility.writeParquet(rowVals,schema,s3Path);
}

private StructType createSchema() {
StructType structType = new StructType();
structType = structType.add("col1", DataTypes.StringType, false);
structType = structType.add("col1w", DataTypes.StringType, false);
return structType;
}
Spark Utility Class

Code: Select all

private SparkSession session;

// constructor here

private SparkSession getSparkSession() {
SparkConf sparkConf = new SparkConf()
.setAppName("myName")
// further settings here
.set("fs.s3a.endpoint", "s3-us-east-1.amazonaws.com");
return SparkSession.builder().config(sparkConf).getOrCreate();
}

public void writeParquet(List entries, StructType structType,String path) {
session.createDataFrame(entries,structType)
.write().mode("overwrite").format("parquet").save(path);
}
< /code>
Dies funktioniert und es ist in Ordnung. [url=viewtopic.php?t=14917]Ich möchte[/url] jetzt jedoch die Verarbeitung 
Klasse als SO ändern:

Code: Select all

// constructor is here
public void run() {
... // some work and fetching data
Dataset myData = ... // selecting and preparing the data
kafkaDF.foreachPartition(partition -> {
DummyClass dummy = new DummyClass(...); // initialising classes in executors
partition.forEachRemaining(record -> {
String myField = row.getAs("colName");
... //some more work
dummyClass.createParquet(myField);
});
});
< /code>
Der Rest des Codes ist derzeit unverändert. Der Code wird gut ausgeführt, kann jedoch Daten speichern und die folgende Ausnahme ausgelegt: < /p>
Cannot invoke "scala.Option.map(scala.Function1)" because the return value of "org.apache.spark.sql.SparkSession.parentSessionState()" is null
< /code>
Aus dem, was ich verstehe, liegt das daran, dass ich versuche, die Spark Session in Executors zu verwenden. Wie kann ich den Datensatz in Parquet umwandeln und in S3 speichern? Wenn es eine Möglichkeit gibt, auf Sitzung zuzugreifen und sie mit der Methode von .Write () 
zu sagen, dass Daten mit .Write () gespeichert werden sollen? Und verschiedene Versuche, die Sitzung zu holen, führt zu dem gleichen Fehler.

Quick Reply

Change Text Case: 
   
  • Similar Topics
    Replies
    Views
    Last post