Ich habe ein Projekt mit Kafka -Streams, um eine Minute Kerze für Aktien zu erstellen. Mein Topologiecode lautet: < /p>
List inputTopics = new ArrayList();
inputTopics.add(tradeTopic);
Consumed CandleConsumerOptions = Consumed
.with(Serdes.String(), ProtobufSerdes.Trade())
.withTimestampExtractor(new CandleTimestampExtractor());
KTable ohlcKTable =
stream
.filter((key, Trade) -> Trade.getTradeTime() != 0 && Trade.getTradeTime() > todayMillis)
.groupByKey(
Grouped.with(Serdes.String(),ProtobufSerdes.Trade())
)
.windowedBy(TimeWindows.ofSizeAndGrace(Duration.ofMinutes(1)),Duration.ofHours(8)))
.aggregate(
()-> ProtoClass.OHLC.newBuilder().build(),
(String key, ProtoClass.Trade trade,ProtoClass.OHLC ohlc) -> ExtendedOHLC.add(trade,key,ohlc),
Materialized.as(stateStoreName)
.withCachingEnabled()
.withKeySerde(Serdes.String())
.withValueSerde(ProtobufSerdes.OHLC())
);
< /code>
und
Meine Konfiguration ist: < /p>
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BootstrapServer);
streamsConfiguration.put(StreamsConfig.APPLICATION_SERVER_CONFIG, appServerConfig);
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, "/app/kafka-stream");
streamsConfiguration.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 3);
streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);
streamsConfiguration.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 2);
streamsConfiguration.put(StreamsConfig.MAX_WARMUP_REPLICAS_CONFIG, 2);
streamsConfiguration.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1000);
< /code>
Meine Serviceschicht lautet: < /p>
public List getRange(String symbol, long from, long to) {
Instant fromTime = Instant.ofEpochMilli(convertEpochTimeToLocal(from));
Instant toTime = Instant.ofEpochMilli(convertEpochTimeToLocal(to));
KeyQueryMetadata metadata = streams.queryMetadataForKey(stateStoreName, symbol, Serdes.String().serializer());
if (hostInfo.equals(metadata.activeHost()))
return fetchCandlesLocally(symbol, fromTime, toTime);
else
return fetchCandlesRemotelyGrpc(symbol, from, to, metadata.activeHost());
//return fetchCandlesRemotely(symbol, from, to, metadata.activeHost());
}
private List fetchCandlesLocally(String symbol, Instant fromTime, Instant toTime) {
readOnlyWindowStore = streams.store(StoreQueryParameters.fromNameAndType(stateStoreName, QueryableStoreTypes.windowStore()));
List candles = new ArrayList();
try (WindowStoreIterator
range = readOnlyWindowStore.fetch(symbol, fromTime, toTime)) {
if (range == null)
return candles;
while (range.hasNext()) {
KeyValue next = range.next();
if (next.value != null) {
Candle candle = createCandle(symbol, next.value,next.key);
candles.add(candle);
}
}
}
return candles;
}
< /code>
Mein Problem ist, dass ich viele Eingabedaten für die Verarbeitung habe und ungefähr 700 Anforderungen pro Sekunde in der Serviceschicht erheblich erhöht. Ich habe 24 Partitionen und ich habe 12 Instanzen. Warum tut dieses Problem? Und was ist Ihr Vorschlag zur Lösung dieses Problems?
Verringern Sie die Reaktionszeit in Kafka -Streams ⇐ Java
-
- Similar Topics
- Replies
- Views
- Last post
-
-
Ist dies die richtige Kafka Consumer -Konfiguration - unter diesem Kafka -Setup?
by Anonymous » » in Java - 0 Replies
- 13 Views
-
Last post by Anonymous
-