Problemübersicht:
Ich arbeite an einer Flink-Anwendung, die es Benutzern ermöglicht, Datenflüsse dynamisch zu entwerfen. Die Kern-Engine ist um Stufen herum aufgebaut, wobei ein DataStream nacheinander durch diese Stufen geleitet wird. Jede Stufe verarbeitet den Stream und gibt ihn aus, der dann an die nächste Stufe übergeben wird.
Jetzt muss ich eine Switch-Stufe implementieren, die das Routing des DataStreams in mehrere Routen ermöglicht. Jede Route hat:
Einen Fall (Bedingung): Einen bestimmten Wert, der anhand eines Felds in den Datensätzen des DataStream überprüft werden soll.
Eine Pipeline von Stufen: Jede Route kann ihre eigene haben eigene einzigartige Abfolge von Schritten zur Verarbeitung der Daten, die ihrer Groß-/Kleinschreibung entsprechen.
Das Hauptziel besteht darin:
Den DataStream basierend auf den Feldwerten in jedem Datensatz dynamisch weiterzuleiten.
Stellen Sie sicher, dass ein Datensatz nur dann in eine Route gelangt, wenn seine Bedingung mit dem Fall für übereinstimmt Dieser Weg.
Die Herausforderung:
Flink verwendet standardmäßig eine verzögerte Auswertung, was bedeutet, dass der Ausführungsplan zuerst erstellt wird und keine Daten verarbeitet werden, bis der Job beginnt. Aus diesem Grund:
Wenn die Routing-Logik außerhalb der Funktion „processElement()“ platziert wird, wird sie ausgeführt, bevor Daten verarbeitet werden, was dazu führt, dass alle Routen vorzeitig eingegeben werden.
Wenn die Wenn die Routing-Logik in die Funktion „processElement()“ eingefügt wird, kann ich einzelne Datensätze korrekt weiterleiten, aber:
Ich kann den resultierenden gerouteten DataStream nicht an die nachfolgenden Stufen übergeben.
processElement() funktioniert also nur bei einzelnen Datensätzen Es erlaubt mir nicht, vollständige DataStream-Transformationen dynamisch durchzuführen jede Route.
Anforderungen an die Lösung:
Die Routing-Logik muss zur Laufzeit basierend auf den tatsächlichen Daten im DataStream ausgeführt werden, nicht während der Erstellung des Ausführungsplans des Jobs.
Jede Route muss ihre eigene haben Unabhängige Pipeline von Stufen, die nur die Daten verarbeiten sollte, die dem Zustand der Route entsprechen.
Die Lösung sollte sicherstellen, dass die verzögerte Auswertung nicht alle Routen vorzeitig ausführt und die Verarbeitung nur erfolgt, wenn die Daten eintreffen.
Aktuell Versuche:
Routing-Logik außerhalb von ProcessElement():
Dieser Ansatz führt alle Routenpipelines aus, bevor die Daten verarbeitet werden, da Flink die Transformationslogik im Voraus auswertet zur verzögerten Auswertung.
Dadurch werden alle Routen eingegeben, was nicht das gewünschte Verhalten ist.
Routing-Logik innerhalb von ProcessElement():
Durch Verschieben des Routings Wenn ich die Logik in „processElement()“ verwende, kann ich die Route korrekt identifizieren Ein Datensatz gehört zu.
processElement() arbeitet jedoch mit einzelnen Datensätzen und ermöglicht es mir nicht, den resultierenden gerouteten DataStream dynamisch umzuwandeln oder an die entsprechende Phasenpipeline zu übergeben.
Beispielanwendungsfall:
Phasenkonfiguration wechseln:
Zu prüfendes Feld: field
Routen:
Route A: field = "case1"
Route B: field = "case2"
Erwartetes Verhalten:
Für jeden Datensatz im DataStream:
Wenn der Wert des Felds „case1“ entspricht, sollte der Datensatz an Route A weitergeleitet und durch seine Phasen verarbeitet werden .
Wenn der Wert des Felds „case2“ entspricht, sollte der Datensatz an Route B weitergeleitet und durch seine Stufen verarbeitet werden.
Wenn keine Übereinstimmung gefunden wird, sollte der Datensatz an die Standardpipeline weitergeleitet werden.
/>Probleme Konfrontiert mit:
Eifrige Auswertung: Alle Routenpipelines (z. B. Etappen für Route A und Route B) werden ausgeführt, bevor Daten eintreffen.
Einzeldatensatzverarbeitung: Durch das Platzieren der Logik in „processElement()“ kann ich einzelne Datensätze verarbeiten, aber ich kann den resultierenden gerouteten DataStream nicht dynamisch an die Pipelines der nachfolgenden Stufen übergeben.
Gewünschte Lösung:
Ein Mechanismus, der ermöglicht dynamisches Routing des DataStream basierend auf Zeichnen Sie Feldwerte zur Laufzeit auf.
Jede Route sollte über eine eigene Pipeline von Phasen verfügen, die nur die Daten verarbeitet, die ihrem Fall entsprechen.
Vermeiden Sie eine vorzeitige Ausführung der Routenpipelines während der verzögerten Auswertung von Flink.
Beispiel zur Verdeutlichung:
Mein Code hängt von Stufen ab, jede Stufe kann eine Quelle, Transformation oder Senke sein und jede Stufe enthält eine Initialisierungsfunktion und Funktion ausführen.
Jetzt Ich habe eine Switch-Phase hinzugefügt, die bestimmt, welche Route der Datenfluss nehmen soll. Dies ist wie folgt konfiguriert:
stages=source:source1,rules:rules1,switch:switch1
switch1.type=switch
switch1.routes=routeA,routeB
switch1.field=user_id
routeA.case=1
routeA.stages=source:source2,rules:rules2,target:target1
routeB.case=2
routeB.stages=source:source3,rules:rules3,target:target2
< /code>
Ich habe meinen DataStream so behoben, dass es immer eine user_id enthält, die gleich 1 entspricht. Jetzt sollte es immer Routea eingeben.
, aber es gibt immer beide Routen ein. BR/> Dies ist meine Switch-Stufe Class :
Https://docs.google.com/document/d/1pju ... 5m/editits usp = Sharing
Alle Protokolle werden gedruckt, bevor die Daten angekommen sind, und der Code gibt die Initialisierung und Ausführung aller Routen ein. Es gibt nicht die RouteSPlitterfunction Zuerst entscheiden Sie, welche Route es dauern sollte.
Ich hoffe, dies klärt mein Problem. < /p>
Apache Flink Verzweigung ⇐ Java
-
- Similar Topics
- Replies
- Views
- Last post