KafkaListenerEndpointContainer Die Kafka-Transaktion konnte nicht mit Spring Kafka erstellt werdenJava

Java-Forum
Guest
 KafkaListenerEndpointContainer Die Kafka-Transaktion konnte nicht mit Spring Kafka erstellt werden

Post by Guest »

Ich verwende Spring-Kafka 2.2.2.RELEASE(org.apache.kafka:kafka-clients:jar:2.0.1) und Spring-Boot(2.1.1). Ich kann keine Transaktion ausführen, da mein Listener keine Partition zuweisen kann. Ich habe die vorgeschlagene Konfiguration für genau einen Verbraucher erstellt. Ich versuche, einen transaktionalen Listener-Container und eine exakt einmalige Verarbeitung zu konfigurieren.

Ich habe den Produzenten und Konsumenten mit dem Transaktionsmanager konfiguriert, den Produzenten mit der Transaktions-ID, den Konsumenten mit isolation.level=read_committed .

Code: Select all

@Bean(name = "producerFactory")
public ProducerFactory producerFactory() {
Map configProps = new HashMap();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
configProps.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, false);
configProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,true);
configProps.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,"txApp");
DefaultKafkaProducerFactory producerFactory = new DefaultKafkaProducerFactory(configProps);
producerFactory.setTransactionIdPrefix("tx.");

return producerFactory;
}

@Bean
public KafkaTransactionManager kafkaTransactionManager() {
KafkaTransactionManager kafkaTransactionManager = new KafkaTransactionManager(producerFactory());
// ...
return kafkaTransactionManager;
}

@Bean(name="appTemplate")
public KafkaTemplate kafkaTemplate(){
KafkaTemplate kafkaTemplate = new KafkaTemplate(
producerFactory());
return kafkaTemplate;
}

//Consumer

@Bean("kafkaListenerContainerFactory")
public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory(ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
ConsumerFactory kafkaConsumerFactory,
KafkaTransactionManager kafkaTransactionManager) {
ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
configurer.configure(factory, kafkaConsumerFactory);
factory.getContainerProperties().setTransactionManager(kafkaTransactionManager());
return factory;
}

//in the Consumer
@KafkaListener(topics = "myTopic", groupId = "ingest", concurrency = "4")
public void listener(@Payload MyObject message,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) throws ExecutionException, InterruptedException {

...

// In my producer

myTemplate.executeInTransaction(t-> t.send(kafkaConfig.getTopicName(), myMessage));

Ich erwarte, dass die Nachricht bei meinem Listener ankommt, aber wenn ich den Produzenten ausführe, erhalte ich die folgende Fehlermeldung:

Code: Select all

22-07-2019 10:21:55.283 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] ERROR  o.a.k.c.c.i.ConsumerCoordinator.onJoinComplete request.id= request.caller=  - [Consumer clientId=consumer-2, groupId=ingest] User provided listener org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer$ListenerConsumerRebalanceListener failed on partition assignment
org.springframework.transaction.CannotCreateTransactionException: Could not create Kafka transaction;  nested exception is org.apache.kafka.common.errors.TimeoutException: Timeout expired while initializing transactional state in 60000ms.
at org.springframework.kafka.transaction.KafkaTransactionManager.doBegin(KafkaTransactionManager.java:150)
at org.springframework.transaction.support.AbstractPlatformTransactionManager.getTransaction(AbstractPlatformTransactionManager.java:378)
at org.springframework.transaction.support.TransactionTemplate.execute(TransactionTemplate.java:137)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer$ListenerConsumerRebalanceListener.onPartitionsAssigned(KafkaMessageListenerContainer.java:1657)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:283)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:422)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:352)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:337)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:343)
at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1218)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1175)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1154)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:719)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:676)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.common.errors.TimeoutException: Timeout expired while initializing transactional state in 60000ms.

Quick Reply

Change Text Case: 
   
  • Similar Topics
    Replies
    Views
    Last post