Kafka Streams Topology Test Triver - Rekorde mit unterdrückten (), aber nicht emitStrategy ()Java

Java-Forum
Anonymous
 Kafka Streams Topology Test Triver - Rekorde mit unterdrückten (), aber nicht emitStrategy ()

Post by Anonymous »

Ich arbeite daran, Unit -Tests für eine Kafka -Streams -App zu schreiben, die ein Zeitfenster verwendet, um Nachrichten zu aggregieren und den in diesen Nachrichten während dieses Fenster angezeigten Maximalwerts zurückzugeben. Unten finden Sie die Definition für den Stream. < /P>

Code: Select all

StreamsBuilder builder = new StreamsBuilder();
KStream stream = builder.stream(inputTopicName, Consumed.with(Serdes.String(), outputValueSerde).withTimestampExtractor(new CustomTimestampExtractor()));
stream.groupBy((key, value) -> String.valueOf(value.getMessageKey()),
Grouped.with(Serdes.String(), outputValueSerde))
.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofSeconds(AGGREGATION_PERIOD)))
.emitStrategy(EmitStrategy.onWindowClose())
.aggregate(
CustomObject::new,
(key, value, aggregate) ->
{
// aggregation logic
},
Materialized.
as("state-store")
.withKeySerde(Serdes.String())
.withValueSerde(outputValueSerde)
.withStoreType(Materialized.StoreType.IN_MEMORY)
.withCachingDisabled()
.withRetention(Duration.ofSeconds(AGGREGATION_PERIOD * RETENTION_BUFFER_RATE))
)
.toStream(windowedKey, aggregate) -> windowedKey.key())
.to(outputTopicName, Produced.with(Serdes.String(), outputValueSerde));
< /code>
Beim Ausführen verhält sich dieser Stream wie gewünscht. Der Unit -Test nicht. Unten finden Sie den fraglichen Test. < /P>
    @Test
public void testStreamsValidInput()
{
// two messages within window
CustomObject value = new CustomObject("", , );
CustomObject aggregate = new CustomObject("", , );

// send message timed for after window of first two messages, advances stream time and closes previous window
CustomObject advance = new CustomObject("", , );

// send first two
inputTopic.pipeInput(aggregate);
inputTopic.pipeInput(value);

// send third to close window
inputTopic.pipeInput(advance);

CustomObject output = null;
try
{
output = outputTopic.readValue();
} catch (NoSuchElementException e)
{
fail("No message was produced to output topic!", e);
}

assertEquals(, output.getTimestamp().toString());
assertEquals(1, output.getIntegerKey());
assertEquals(2.0, output.getDoubleValueBeingAggregated());

// confirm only one message was produced to output topic
assertThrows(NoSuchElementException.class, () -> outputTopic.readValue());
}
Bei Ausführen dieses Tests fällt es bei outputTopic.readValue () aus, da der Stream keinen Wert für das Ausgabethema erbrachte.
Ich habe den realen Stream ausgeführt und genau die gleichen Nachrichten in das Eingabesthema gedrückt, wie ich im Test bin. Der reale Stream verhielt sich wie erwartet: Nach dem Versenden der dritten Nachricht wurde das erste Fenster geschlossen und die richtige Gesamtnachricht erstellt. Code> to .Suppress (unterdrückte. bestanden. Mit Suppress () , aber nicht emitStrategy () und wie der Test behebt, damit er mit emitStrategy () funktioniert, wäre es viel geschätzt.
Danke im Voraus.

Quick Reply

Change Text Case: 
   
  • Similar Topics
    Replies
    Views
    Last post