Ich habe einen Kafka -Produzenten, der Nachrichten mit der folgenden SendMessage -Methode sendet. Die Methode nimmt einen Themennamen, Gruppennamen, Taste und Nachrichtennutzlast an und verarbeitet und veröffentlicht dann mehrere Nachrichten aus einem JSON -Array. < /P>
public void sendMessage(String topicName, String groupName, String key, String msg) throws Exception {
if (msg != null) {
JSONObject jsonReq = new JSONObject(msg);
if (key == null || topicName == null || "".equals(key) || "".equals(topicName)) {
throw new CustomException("Invalid key/topicname.");
}
if (jsonReq.has(groupName)) {
JSONArray messages = jsonReq.getJSONArray(groupName);
if (messages.length() == 0) {
log.info("No messages found in groupName: {}. Skipping processing.", groupName);
return;
}
List messageIds = new ArrayList();
AtomicBoolean errorFlag = new AtomicBoolean(false);
long startTime = System.currentTimeMillis();
for (int i = 0; i < messages.length(); i++) {
JSONObject obj = messages.getJSONObject(i);
if (obj.has(key) && StringUtils.isNotBlank(obj.getString(key))) {
ProducerRecord producerRecord =
new ProducerRecord(topicName, obj.getString(key), obj.toString());
producer.send(producerRecord, new CRKafkaCallBackHandler(producerRecord, errorFlag));
messageIds.add(topicName + "\t" + obj.getString(key));
} else {
throw new CustomException("Mandatory property '" + key + "' is missing in message: " + obj.toString());
}
}
long endTime = System.currentTimeMillis();
log.info("Total Messages: {} For Topic: {} Time Taken: {} ms", messages.length(), topicName, (endTime - startTime));
if (errorFlag.get()) {
throw new CustomException("Failed to publish one or more messages to Kafka");
}
} else {
throw new CustomException("Invalid groupName found in request.");
}
} else {
throw new CustomException("Received a NULL or invalid message.");
}
log.info("Message processing completed successfully For Topic: {}", topicName);
}
< /code>
Im Moment läuft diese Methode in einer einzigen Thread-Weise und wir erhalten Timeoutexception < /p>
org.apache.kafka.common.Errors.TimeoutException: expiring 18 record (s) 120001 ms wurde seitdem batch crreation < /p> < /p> < /p> < /p> < /p> < /p> < /p /pl. Vermutlich, es könnte ein Engpass unter hoher Belastung sein. Ich möchte den Durchsatz verbessern, indem ich einen Multi-Thread-Kafka-Produzenten implementiert. Sollte ich einen Thread-Pool fester Größe erstellen und SendMessage-Aufgaben einreichen, oder gibt es einen Kafka-freundlicheren Ansatz? Wie kann ich die Sicherheit der Faden sicherstellen? Da die Send () -Methode von Kafka asynchron ist>
So implementieren Sie einen Multi-Thread-Kafka-Produzenten in Java ⇐ Java
-
- Similar Topics
- Replies
- Views
- Last post