Früh (?) Auslösen der Ereigniszeitfenster von Flink und nicht deterministischen ErgebnissenJava

Java-Forum
Anonymous
 Früh (?) Auslösen der Ereigniszeitfenster von Flink und nicht deterministischen Ergebnissen

Post by Anonymous »

Ich lese eine kleine Stichprobe von Daten, die in einem Kafka gespeichert sind, aber anstatt Wasserzeichen direkt auf die Quelle anzuwenden, mache ich einige Verarbeitung auf den Daten und extrahiere dann die Zeitstempel des Ereignisses. Meistens sind die Ergebnisse in Ordnung, aber manchmal scheint das Fenster (früh?) Ausgelöst zu werden und enthält keine Ereignisse, obwohl ihre Zeitstempel in die Fensterdauer fallen sollten. Ich teste nur für 1-Window. Siehe unten zum Beispiel und versuchen Sie zu verstehen, warum einige Ereignisse, obwohl ihre Zeitstempel in den Fenstergrenzen liegen, nicht in das Fenster einbezogen werden? < /P>
//seconds
int windowSize = 5
int windowSlide = 5
int windowOffset = 0
int envParallelism = 5

//read data from kafkasource
DataStream eventstream =
env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source").setParallelism(1);

// some preprocessing, produce inValue tuples
eventstream = eventstream.flatMap(new JsonFlatMap());

WatermarkStrategy watermarkStrategy = WatermarkStrategy.
forBoundedOutOfOrderness(Duration.ofSeconds(maxOutOfOrderness));

WatermarkStrategy watermarkStrategyWithTimestampAssigner = watermarkStrategy
.withTimestampAssigner(
new SerializableTimestampAssigner() {
@Override
public long extractTimestamp(Event event, long recordTimestamp) {
return event.time;
}
});
eventStream = eventStream.assignTimestampsAndWatermarks(watermarkStrategyWithTimestampAssigner).setParallelism(1);

eventStream.print();
       
//--for every "eventName"(key), add "inValue" to "inValueList" and "outValue" to "outValueList" 
DataStream output = eventStream
.keyBy(new EventKeySelector("eventName")
.window(SlidingEventTimeWindows.of(
Time.seconds(windowSize), Time.seconds(windowSlide),
Time.seconds(windowOffset)))
.process(new GraphProcessWindowFunction());
output.print();
< /code>
Dies sind die in Kafka gespeicherten Daten, nur vier Schlüssel (Ereignisname): < /p>
{"eventName":1,"outValue":2,"time":1743663199798}
{"eventName":2,"outValue":1,"time":1743663199908}
{"eventName":2,"outValue":6,"time":1743663199911}
{"eventName":3,"outValue":6,"time":1743663199914}
{"eventName":6,"outValue":1,"time":1743663199918}
{"eventName":6,"outValue":3,"time":1743663199920}
{"eventName":2,"outValue":6,"time":1743663207928}
{"eventName":3,"outValue":6,"time":1743663207933}
< /code>
Die letzten beiden Ereignisse werden nach 8000 ms erstellt, und sind zu Recht nicht im Fenster enthalten. Nach der Verarbeitung werden die „unschätzbaren“ Tupel erzeugt. Einige Ereignisse sind dem Fenster nicht zugeordnet, obwohl der Zeitstempel in Fenstergrenzen fällt. src = "https://i.static.net/wpn7wxy8.png"/>
Dinge, die ich bisher ausprobiert habe:
Wenn ich festgelegt habe, ist das Fenster. zweites Mal. Wie vermeide ich dies? Habe auch nicht geholfen. Ich habe noch nie einen anderen Wert erhalten, obwohl ich das Wasserzeichen -Intervall geändert habe. Java 11

Quick Reply

Change Text Case: 
   
  • Similar Topics
    Replies
    Views
    Last post