Danke!
Code: Select all
KafkaProducerConfig.java
Bean(name = "packetData")
public KafkaProducer packetDataKafkaProducer() {
Map config = new HashMap();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaURLConfiguration.getKafkaURL());
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
config.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 600000);
config.put(ProducerConfig.BATCH_SIZE_CONFIG, 200000);
config.put(ProducerConfig.LINGER_MS_CONFIG, 10);
config.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, COMPRESSION_TYPE);
return new KafkaProducer(config, new StringSerializer(), new JsonSerializer());
}
ProducerService.java
ProducerRecord records = new ProducerRecord(driver.getCollectionTopic(), packetData);
kafkaProducer.send(records);