Code: Select all
public static void main(String[] args) throws Exception {
... // setting configs
Processing pr = new Processing(...); // initialising all the classes here
pr.run();
}
Code: Select all
private DummyClass dummyClass;
// constructor is here
public void run() {
... // some work and fetching data
Dataset myData = ... // selecting and preparing the data
myData.collectAsList().forEach(row -> {
String myField = row.getAs("colName")
... //some more work
dummyClass.createParquet(myField);
});
}
Code: Select all
private SparkUtility sparkUtility;
// constructor here
public void createParquet(String myField) {
List rowVals = new ArrayList();
StructType schema = createSchema();
...// some work to populate rowVals list
String s3Path = "s3a://bucket/key/key/";
sparkUtility.writeParquet(rowVals,schema,s3Path);
}
private StructType createSchema() {
StructType structType = new StructType();
structType = structType.add("col1", DataTypes.StringType, false);
structType = structType.add("col1w", DataTypes.StringType, false);
return structType;
}
Code: Select all
private SparkSession session;
// constructor here
private SparkSession getSparkSession() {
SparkConf sparkConf = new SparkConf()
.setAppName("myName")
// further settings here
.set("fs.s3a.endpoint", "s3-us-east-1.amazonaws.com");
return SparkSession.builder().config(sparkConf).getOrCreate();
}
public void writeParquet(List entries, StructType structType,String path) {
session.createDataFrame(entries,structType)
.write().mode("overwrite").format("parquet").save(path);
}
< /code>
Dies funktioniert und es ist in Ordnung. [url=viewtopic.php?t=14917]Ich möchte[/url] jetzt jedoch die Verarbeitung
Code: Select all
// constructor is here
public void run() {
... // some work and fetching data
Dataset myData = ... // selecting and preparing the data
kafkaDF.foreachPartition(partition -> {
DummyClass dummy = new DummyClass(...); // initialising classes in executors
partition.forEachRemaining(record -> {
String myField = row.getAs("colName");
... //some more work
dummyClass.createParquet(myField);
});
});
< /code>
Der Rest des Codes ist derzeit unverändert. Der Code wird gut ausgeführt, kann jedoch Daten speichern und die folgende Ausnahme ausgelegt: < /p>
Cannot invoke "scala.Option.map(scala.Function1)" because the return value of "org.apache.spark.sql.SparkSession.parentSessionState()" is null
< /code>
Aus dem, was ich verstehe, liegt das daran, dass ich versuche, die Spark Session in Executors zu verwenden. Wie kann ich den Datensatz in Parquet umwandeln und in S3 speichern? Wenn es eine Möglichkeit gibt, auf Sitzung zuzugreifen und sie mit der Methode von .Write ()