Wie kann man einen Spark -Datensatz durchführen und einen Spaltenwert in Java aktualisieren?Java

Java-Forum
Anonymous
 Wie kann man einen Spark -Datensatz durchführen und einen Spaltenwert in Java aktualisieren?

Post by Anonymous »

Ich arbeite an POC, wo ich die Kontonummer in DB mit Token aktualisieren muss. Ich las die Daten in einen Datensatz -DSRecords (ca. 2M Datensätze). Ich habe eine weitere Routine, die verschiedene Kontonummern erfasst und die Token erhalten hat. Die Zuordnung wird in HashMap gespeichert. < /p>

Dataset applySwappedTokens(Dataset dsRecords, Map mappedTokens){
}
< /code>

Jetzt muss ich über den Datensatz durchlaufen, um Folgendes zu machen -
1. Lesen Sie die Kontennummer Spalte (AccountNumber) Wert und Update (ich weiß, dass der Datensatz unveränderlich ist. Aktualisieren des Datensatzes bedeutet also, dass der Datensatz Kopie von Datensatz mit aktualisierten Zeilen erstellt) mit dem Token -Wert von Mapappokens. Dies kann durch Join oder andere Operationen erreicht werden, aber ich verbringe keine Anstrengungen dafür aufgrund der 2. Aufgabe.
2. Lesen Sie eine andere XML-Blob-Spalte und finden Sie die Kontonummer und aktualisieren Sie sie. Nicht-serialisierbarer Code. Die meisten Online -Ressourcen sind in Scala und nicht in Java. Bitte helfen Sie. Aufgrund des Serialisierungsfehlers. < /strong> < /p>

Dataset output = sparkSession.sqlContext().createDataFrame(dsRecords.javaRDD().map(row -> {
return RowFactory.create(row.get(0), row.get(1), row.get(2), swapToken(row.get(3)),row.get(4));
}), dsRecords.schema());

return output;

String swapToken(Object inputToken) {
return mappedTokens.get(inputToken);//mappedToken will have to be instance field.
}
< /code>

Ansatz2- unvollständig. < /strong> < /p>

dsRecords.foreach((ForeachFunction) row -> {
Integer index = row.fieldIndex("accountNumber");
String pan = row.getString(index);
String swap = this.swapToken(pan);
//TODO: create a dataset with rows from dsRecords but swap value.

});
< /code>

Ansatz 3 - Verwenden Sie UDF mit der Kartenfunktion < /strong> < /p>

Erstellen Sie ein UDF2 (das Nimmt 2 Eingabeparameter nämlich AccountNumber und kartiert und gibt Token zurück). Es scheint, dass UDF nur Spaltenwerte aufnehmen kann < /p>

Update 1 - UDF < /strong>
Also habe ich UDF implementiert (AFK, wird den Code veröffentlichen später):
1. Definiertes UDF1 "UpdateToken", um den XML -Spaltenwert zu übergeben und den aktualisierten XML -Wert zurückzugeben.
2. Die HashMap-Instanz "MapappoKens", die das Account-gepflegtes Paar Mapping hat, wird statisch gemacht. Es wird in meiner UDF -Funktion zugegriffen, um das Konto in XML -Zeichenfolge zu finden und mit dem Token zu aktualisieren. < /p>

Ich könnte meine applyWaptKingokens -Funktion testen, die das obige UDF im Datensatz "withColumn" aufruft. Wenn ich das Spark -Programm jedoch ausführe, sehe ich, dass „kartierte“ Daten von „Null“ enthält, und daher wird die XML -Spalte mit leeren Daten aktualisiert. Ich denke, die statischen „kartierten“ befinden sich entweder in einem anderen JVM oder in einem anderen Treiber (selbst in lokaler Ebene erzeugt der Funke einen isolierten Treiber, der Testamentsvollstrecker). Es ist frustrierend, dass es keine einfache Lösung gibt, um Zeilen in Spark zu iterieren und zu aktualisieren. < /P>

Dataset processByRow(Dataset dsRecords, SparkSession sparkSession) {
sparkSession.udf().register("updateToken", updateToken, DataTypes.StringType);
return ds = dsRecords.withColumn("eventRecordTokenText", callUDF("updateToken", dsRecords.col("eventRecordTokenText")));
}

static UDF1 updateToken = new UDF1() {
public String call(final String tokenText) throws Exception {
// xml operations here..
for (int nodeIndex = 0; nodeIndex < nList.getLength(); nodeIndex++) {
Node thisNode = nList.item(nodeIndex);
if (thisNode.getAttributes().getNamedItem("ProcessTokenValue") != null && thisNode.getAttributes()
.getNamedItem("ProcessTokenValue").getNodeValue().equalsIgnoreCase("true")) {
Node valueNode = thisNode.getAttributes().getNamedItem("Value");
String thisToken = valueNode.getNodeValue();
String newToken = mappedTokens.get(thisToken); // *returns null values from the map*
if(newToken != null && !newToken.isEmpty())
valueNode.setNodeValue(newToken);
}
}
// more xml operations here..
return output;
}
};
< /code>

Update 2 - ITREAT UND UPDATE < /strong>
Jetzt versuche ich Zeile durch Zeilenverfolgung .. < /p>

Dataset processByRow1(Dataset dsRecords, SparkSession sparkSession) {
List newRows = new ArrayList();
dsRecords.foreach((ForeachFunction) record -> {
String currentToken = record.getAs(AppConstants.TokenCol);
String newToken = mappedTokens.get(currentToken);
newRows.add(new MongoRecordSmall(record.getString(0), record.getString(1), newToken, record.getString(3)));
logger.error(“Size plus=“+newRows.size());
});
return sparkSession.createDataFrame(newRows, MongoRecordSmall.class);
}
< /code>

Dies wirft einen Serliazationsfehler. Es scheint .

Quick Reply

Change Text Case: 
   
  • Similar Topics
    Replies
    Views
    Last post