So beheben Sie org.apache.spark.SparkException: Job abgebrochen aufgrund eines Phasenfehlers Task & com.datastax.spark.cJava

Java-Forum
Anonymous
 So beheben Sie org.apache.spark.SparkException: Job abgebrochen aufgrund eines Phasenfehlers Task & com.datastax.spark.c

Post by Anonymous »

In meinem Projekt verwende ich den Spark-Cassandra-Connector, um die aus der Cassandra-Tabelle zu lesen und sie weiter in JavaRDD zu verarbeiten, aber ich habe ein Problem bei der Verarbeitung der Cassandra-Zeile in javaRDD.

Code: Select all

org.apache.spark.SparkException: Job aborted due to stage failure: Task 2 in stage 0.0 failed 4 times, most recent failure: Lost task 2.3 in stage 0.0 (TID 52, 172.20.0.4, executor 1):
java.lang.ClassNotFoundException: com.datastax.spark.connector.rdd.partitioner.CassandraPartition
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:67)
at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1868)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1751)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2042)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75)
at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:370)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Ich habe meinen Spark so konfiguriert, dass er einen Spark-Cluster verwendet. Wenn ich den Master als lokal verwende, funktioniert der Code einwandfrei, aber sobald ich ihn durch den Master ersetze, tritt ein Problem auf.
Hier ist meine Spark-Konfiguration:

Code: Select all

SparkConf sparkConf = new SparkConf().setAppName("Data Transformation")
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer").setMaster("spark://masterip:7077");

sparkConf.set("spark.cassandra.connection.host", cassandraContactPoints);
sparkConf.set("spark.cassandra.connection.port", cassandraPort);
sparkConf.set("spark.cassandra.connection.timeout_ms", "5000");
sparkConf.set("spark.cassandra.read.timeout_ms", "200000");
sparkConf.set("spark.driver.allowMultipleContexts", "true");

/*
* sparkConf.set("spark.cassandra.auth.username", "centralrw");
* sparkConf.set("spark.cassandra.auth.password", "t8b9HRWy");
*/
logger.info("creating spark context object");
sparkContext = new JavaSparkContext(sparkConf);
logger.info("returning sparkcontext object");
return sparkContext;
Spark-Version – 2.4.0
spark-Cassandra_connector – 2.4.0

ReceiverConfig:

Code: Select all

public List readDataFromGenericTriggerEntityUsingSpark(
JavaSparkContext sparkContext) {

List genericTriggerEntityList = new ArrayList();

try {

logger.info("Keyspace & table name to read data from cassandra");
String tableName = "generictriggerentity";
String keySpace = "centraldatalake";

logger.info("establishing conection");
CassandraJavaRDD cassandraRDD = CassandraJavaUtil.javaFunctions(sparkContext)
.cassandraTable(keySpace, tableName);
int num = cassandraRDD.getNumPartitions();
System.out.println("num- " + num);

logger.info("Converting extracted rows to JavaRDD");
JavaRDD rdd = cassandraRDD
.map(new Function() {

private static final long serialVersionUID = -165799649937652815L;

@Override
public Map call(CassandraRow row) throws Exception {
Map  genericTriggerEntityMap = new HashMap();
GenericTriggerEntity genericTriggerEntity = new GenericTriggerEntity();
if (row.getString("end") != null)
genericTriggerEntity.setEnd(row.getString("end"));
if (row.getString("key") != null)
genericTriggerEntity.setKey(row.getString("key"));
if (row.getString("keyspacename") != null)
genericTriggerEntity.setKeyspacename(row.getString("keyspacename"));
if (row.getString("partitiondeleted") != null)
genericTriggerEntity.setPartitiondeleted(row.getString("partitiondeleted"));
if (row.getString("rowdeleted") != null)
genericTriggerEntity.setRowdeleted(row.getString("rowdeleted"));
if (row.getString("rows") != null)
genericTriggerEntity.setRows(row.getString("rows"));
if (row.getString("start") != null)
genericTriggerEntity.setStart(row.getString("start"));
if (row.getString("tablename") != null) {
genericTriggerEntity.setTablename(row.getString("tablename"));
dataTableName = row.getString("tablename");
}
if (row.getString("triggerdate") != null)
genericTriggerEntity.setTriggerdate(row.getString("triggerdate"));
if (row.getString("triggertime") != null)
genericTriggerEntity.setTriggertime(row.getString("triggertime"));
if (row.getString("uuid") != null)
genericTriggerEntity.setUuid(row.getUUID("uuid"));

genericTriggerEntityMap.put(dataTableName, genericTriggerEntity);
return genericTriggerEntityMap;
}

});

List
 partition = rdd.partitions();
System.out.println("partion - " + partition.size());

logger.info("Collecting data into rdd");
genericTriggerEntityList = rdd.collect();

} catch (Exception e) {
e.printStackTrace();
}
logger.info("returning generic trigger entity list");
return genericTriggerEntityList;
}
Wenn ich rdd.collect() ausführe, gibt es eine Ausnahme

Code: Select all

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 21, 10.22.3.55, executor 0): java.lang.ClassNotFoundException: in.dmart.central.data.transform.base.config.ReceiverConfig$1
Ich habe eine Lösung gefunden, ein Fat Jar zu erstellen und es in meinen Code einzubinden, aber ich möchte das nicht tun, da ich jedes Mal, wenn ich Änderungen vornehme, den Vorgang erneut durchführen muss und das nicht möglich ist.

Bitte schlagen Sie eine Lösung zur Konfiguration im Code oder Spark-Cluster vor.
Vielen Dank im Voraus.

Quick Reply

Change Text Case: 
   
  • Similar Topics
    Replies
    Views
    Last post