Ich arbeite daran, Unit -Tests für eine Kafka -Streams -App zu schreiben, die ein Zeitfenster verwendet, um Nachrichten zu aggregieren und den in diesen Nachrichten während dieses Fenster angezeigten Maximalwerts zurückzugeben. Unten finden Sie die Definition für den Stream. < /P>
StreamsBuilder builder = new StreamsBuilder();
KStream stream = builder.stream(inputTopicName, Consumed.with(Serdes.String(), outputValueSerde).withTimestampExtractor(new CustomTimestampExtractor()));
stream.groupBy((key, value) -> String.valueOf(value.getMessageKey()),
Grouped.with(Serdes.String(), outputValueSerde))
.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofSeconds(AGGREGATION_PERIOD)))
.emitStrategy(EmitStrategy.onWindowClose())
.aggregate(
CustomObject::new,
(key, value, aggregate) ->
{
// aggregation logic
},
Materialized.
as("state-store")
.withKeySerde(Serdes.String())
.withValueSerde(outputValueSerde)
.withStoreType(Materialized.StoreType.IN_MEMORY)
.withCachingDisabled()
.withRetention(Duration.ofSeconds(AGGREGATION_PERIOD * RETENTION_BUFFER_RATE))
)
.toStream(windowedKey, aggregate) -> windowedKey.key())
.to(outputTopicName, Produced.with(Serdes.String(), outputValueSerde));
< /code>
Beim Ausführen verhält sich dieser Stream wie gewünscht. Der Unit -Test nicht. Unten finden Sie den fraglichen Test. < /P>
@Test
public void testStreamsValidInput()
{
// two messages within window
CustomObject value = new CustomObject("", , );
CustomObject aggregate = new CustomObject("", , );
// send message timed for after window of first two messages, advances stream time and closes previous window
CustomObject advance = new CustomObject("", , );
// send first two
inputTopic.pipeInput(aggregate);
inputTopic.pipeInput(value);
// send third to close window
inputTopic.pipeInput(advance);
CustomObject output = null;
try
{
output = outputTopic.readValue();
} catch (NoSuchElementException e)
{
fail("No message was produced to output topic!", e);
}
assertEquals(, output.getTimestamp().toString());
assertEquals(1, output.getIntegerKey());
assertEquals(2.0, output.getDoubleValueBeingAggregated());
// confirm only one message was produced to output topic
assertThrows(NoSuchElementException.class, () -> outputTopic.readValue());
}
Bei Ausführen dieses Tests fällt es bei outputTopic.readValue () aus, da der Stream keinen Wert für das Ausgabethema erbrachte.
Ich habe den realen Stream ausgeführt und genau die gleichen Nachrichten in das Eingabesthema gedrückt, wie ich im Test bin. Der reale Stream verhielt sich wie erwartet: Nach dem Versenden der dritten Nachricht wurde das erste Fenster geschlossen und die richtige Gesamtnachricht erstellt. Code> to .Suppress (unterdrückte. bestanden. Mit Suppress () , aber nicht emitStrategy () und wie der Test behebt, damit er mit emitStrategy () funktioniert, wäre es viel geschätzt.
Danke im Voraus.
Ich arbeite daran, Unit -Tests für eine Kafka -Streams -App zu schreiben, die ein Zeitfenster verwendet, um Nachrichten zu aggregieren und den in diesen Nachrichten während dieses Fenster angezeigten Maximalwerts zurückzugeben. Unten finden Sie die Definition für den Stream. < /P> [code]StreamsBuilder builder = new StreamsBuilder(); KStream stream = builder.stream(inputTopicName, Consumed.with(Serdes.String(), outputValueSerde).withTimestampExtractor(new CustomTimestampExtractor())); stream.groupBy((key, value) -> String.valueOf(value.getMessageKey()), Grouped.with(Serdes.String(), outputValueSerde)) .windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofSeconds(AGGREGATION_PERIOD))) .emitStrategy(EmitStrategy.onWindowClose()) .aggregate( CustomObject::new, (key, value, aggregate) -> { // aggregation logic }, Materialized. as("state-store") .withKeySerde(Serdes.String()) .withValueSerde(outputValueSerde) .withStoreType(Materialized.StoreType.IN_MEMORY) .withCachingDisabled() .withRetention(Duration.ofSeconds(AGGREGATION_PERIOD * RETENTION_BUFFER_RATE)) ) .toStream(windowedKey, aggregate) -> windowedKey.key()) .to(outputTopicName, Produced.with(Serdes.String(), outputValueSerde)); < /code> Beim Ausführen verhält sich dieser Stream wie gewünscht. Der Unit -Test nicht. Unten finden Sie den fraglichen Test. < /P> @Test public void testStreamsValidInput() { // two messages within window CustomObject value = new CustomObject("", , ); CustomObject aggregate = new CustomObject("", , );
// send message timed for after window of first two messages, advances stream time and closes previous window CustomObject advance = new CustomObject("", , );
// send first two inputTopic.pipeInput(aggregate); inputTopic.pipeInput(value);
// send third to close window inputTopic.pipeInput(advance);
CustomObject output = null; try { output = outputTopic.readValue(); } catch (NoSuchElementException e) { fail("No message was produced to output topic!", e); }
// confirm only one message was produced to output topic assertThrows(NoSuchElementException.class, () -> outputTopic.readValue()); } [/code] Bei Ausführen dieses Tests fällt es bei outputTopic.readValue () aus, da der Stream keinen Wert für das Ausgabethema erbrachte. Ich habe den realen Stream ausgeführt und genau die gleichen Nachrichten in das Eingabesthema gedrückt, wie ich im Test bin. Der reale Stream verhielt sich wie erwartet: Nach dem Versenden der dritten Nachricht wurde das erste Fenster geschlossen und die richtige Gesamtnachricht erstellt. Code> to .Suppress (unterdrückte. bestanden. Mit Suppress () , aber nicht emitStrategy () und wie der Test behebt, damit er mit emitStrategy () funktioniert, wäre es viel geschätzt. Danke im Voraus.
Ich habe ein Projekt mit Kafka -Streams, um eine Minute Kerze für Aktien zu erstellen. Mein Topologiecode lautet:
List inputTopics = new ArrayList();
inputTopics.add(tradeTopic);
Consumed...
Ich verwende Spring-Kafka 2.2.2.RELEASE(org.apache.kafka:kafka-clients:jar:2.0.1) und Spring-Boot(2.1.1). Ich kann keine Transaktion ausführen, da mein Listener keine Partition zuweisen kann. Ich...
Ich arbeite derzeit an einem Projekt mit Java Spring Boot und Apache Kafka, bei dem mehrere Microservices über Kafka kommunizieren. Unser Ziel ist es, 100.000 Transaktionen pro Sekunde (TPS) (oder...
Hier geht es um den Kafka-Hörer. Wir haben keinen Zugriff auf den Herausgeber (es ist ein Drittanbieter).
Genau 9 Minuten nach dem Start der App erhalten wir Folgendes in unseren...