Verringern Sie die Reaktionszeit in Kafka -StreamsJava

Java-Forum
Anonymous
 Verringern Sie die Reaktionszeit in Kafka -Streams

Post by Anonymous »

Ich habe ein Projekt mit Kafka -Streams, um eine Minute Kerze für Aktien zu erstellen. Mein Topologiecode lautet: < /p>
List inputTopics = new ArrayList();
inputTopics.add(tradeTopic);
Consumed CandleConsumerOptions = Consumed
.with(Serdes.String(), ProtobufSerdes.Trade())
.withTimestampExtractor(new CandleTimestampExtractor());

KTable ohlcKTable =
stream
.filter((key, Trade) -> Trade.getTradeTime() != 0 && Trade.getTradeTime() > todayMillis)
.groupByKey(
Grouped.with(Serdes.String(),ProtobufSerdes.Trade())
)
.windowedBy(TimeWindows.ofSizeAndGrace(Duration.ofMinutes(1)),Duration.ofHours(8)))
.aggregate(
()-> ProtoClass.OHLC.newBuilder().build(),
(String key, ProtoClass.Trade trade,ProtoClass.OHLC ohlc) -> ExtendedOHLC.add(trade,key,ohlc),
Materialized.as(stateStoreName)
.withCachingEnabled()
.withKeySerde(Serdes.String())
.withValueSerde(ProtobufSerdes.OHLC())
);
< /code>
und

Meine Konfiguration ist: < /p>
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BootstrapServer);
streamsConfiguration.put(StreamsConfig.APPLICATION_SERVER_CONFIG, appServerConfig);
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, "/app/kafka-stream");
streamsConfiguration.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 3);
streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);
streamsConfiguration.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 2);
streamsConfiguration.put(StreamsConfig.MAX_WARMUP_REPLICAS_CONFIG, 2);
streamsConfiguration.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1000);
< /code>
Meine Serviceschicht lautet: < /p>
public List getRange(String symbol, long from, long to) {
Instant fromTime = Instant.ofEpochMilli(convertEpochTimeToLocal(from));
Instant toTime = Instant.ofEpochMilli(convertEpochTimeToLocal(to));

KeyQueryMetadata metadata = streams.queryMetadataForKey(stateStoreName, symbol, Serdes.String().serializer());
if (hostInfo.equals(metadata.activeHost()))
return fetchCandlesLocally(symbol, fromTime, toTime);
else
return fetchCandlesRemotelyGrpc(symbol, from, to, metadata.activeHost());
//return fetchCandlesRemotely(symbol, from, to, metadata.activeHost());

}
private List fetchCandlesLocally(String symbol, Instant fromTime, Instant toTime) {

readOnlyWindowStore = streams.store(StoreQueryParameters.fromNameAndType(stateStoreName, QueryableStoreTypes.windowStore()));

List candles = new ArrayList();
try (WindowStoreIterator
range = readOnlyWindowStore.fetch(symbol, fromTime, toTime)) {
if (range == null)
return candles;

while (range.hasNext()) {
KeyValue next = range.next();
if (next.value != null) {
Candle candle = createCandle(symbol, next.value,next.key);
candles.add(candle);
}
}
}
return candles;
}
< /code>
Mein Problem ist, dass ich viele Eingabedaten für die Verarbeitung habe und ungefähr 700 Anforderungen pro Sekunde in der Serviceschicht erheblich erhöht. Ich habe 24 Partitionen und ich habe 12 Instanzen. Warum tut dieses Problem? Und was ist Ihr Vorschlag zur Lösung dieses Problems?

Quick Reply

Change Text Case: 
   
  • Similar Topics
    Replies
    Views
    Last post