Ist es sicher, dieselbe Operator-Status-Instanz in Apache Flink zu mutieren, auszugeben und einen Snapshot zu erstellen?Java

Java-Forum
Anonymous
 Ist es sicher, dieselbe Operator-Status-Instanz in Apache Flink zu mutieren, auszugeben und einen Snapshot zu erstellen?

Post by Anonymous »

Ich erstelle ein einzelnes globales Topologieobjekt in einer nicht verschlüsselten ProcessFunction mit Parallelität = 1. Ich behalte es als lokales veränderbares Objekt und aktualisiere es für jedes Eingabeereignis mit topology.apply(GnmiEvent). Nachdem ich die lokale Topologie aktualisiert habe, rufe ich out.collect(topology) auf, um die aktuelle Ansicht auszugeben. Ich sende diese Ausgabe nachgeschaltet.
  • Wenn ich out.collect(topology) aufrufe und später dieselbe Topologieinstanz für nachfolgende Ereignisse mutiere (über die Methode apply), können nachgeschaltete (verkettete) Operatoren oder Senken diese späteren Mutationen für das zuvor ausgegebene Objekt beobachten? Ich denke, in diesem Fall sollte es in Ordnung sein, weil ich es direkt gesendet habe? Aber was würde passieren, wenn ich einen weiteren Betreiber hinzufügen würde, der keinen Netzwerk-Shuffle impliziert? Unter welchen Bedingungen geschieht das (Verkettung vs. Netzwerkgrenzen / Parallelität)? Muss ich den Status vor dem Senden tief kopieren?
  • Erstellt topologyState.add(topology) in snapshotState() eine sichere tiefe Kopie für gängige Backends (HashMapStateBackend/Heap vs. RocksDB)? Sollte ich beim Erstellen von Snapshots oder beim Senden defensiv kopieren?

Code: Select all

public class TopologyProcessFunction
extends ProcessFunction
implements CheckpointedFunction {

private Topology topology;
private transient ListState topologyState;

@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
ListStateDescriptor descriptor =
new ListStateDescriptor("topology-state", Topology.class);
topologyState = context.getOperatorStateStore().getListState(descriptor);
if (context.isRestored()) {
topology = getTopologyFromState();
}
}

@Override
public void processElement(GnmiEvent gnmiEvent,
Context ctx,
Collector out) {
if (topology == null) {
topology = new Topology();
}
topology.apply(gnmiEvent);
out.collect(topology);
}

@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
if (topology != null) {
topologyState.clear();
topologyState.add(topology);
}
}

private Topology getTopologyFromState() throws Exception {
Iterator it = topologyState.get().iterator();
return it.hasNext() ? it.next() : null;
}
}
Das Einzige, was ich gefunden habe, ist Folgendes: https://stackoverflow.com/a/66597952/11922563
Aber als Flink-Neuling konnte ich nicht ganz verstehen, ob dies tatsächlich meine Frage beantwortet/auf sie zutrifft.
Vielen Dank im Voraus!

Quick Reply

Change Text Case: 
   
  • Similar Topics
    Replies
    Views
    Last post