by Guest » 31 Dec 2024, 15:56
Das Upgrade von SpringBoot 2.5.12 auf 3.4.0 hatte zur Folge, dass KafkaListener die von containerFactory angegebene Bean nicht auswählen konnte und der folgende Fehler ausgegeben wurde.
Code: Select all
11:27:40.337 INFO [roducer-network-thread | consumer-test-1] [ org.apache.kafka.clients.NetworkClient, 1017] - [Producer clientId=consumer-test-1] Node -1 disconnected.
11:27:40.338 WARN [roducer-network-thread | consumer-test-1] [ org.apache.kafka.clients.NetworkClient, 849] - [Producer clientId=consumer-test-1] Connection to node -1 (localhost/127.0.0.1:9092) could not be established. Node may not be available.
11:27:40.338 WARN [roducer-network-thread | consumer-test-1] [ o.a.k.c.NetworkClient$DefaultMetadataUpdater, 1173] - [Producer clientId=consumer-test-1] Bootstrap broker localhost:9092 (id: -1 rack: null) disconnected
11:27:40.720 INFO [ Test worker] [ org.apache.kafka.common.metrics.Metrics, 684] - Metrics scheduler closed
11:27:40.722 INFO [ Test worker] [ org.apache.kafka.common.metrics.Metrics, 688] - Closing reporter org.apache.kafka.common.metrics.JmxReporter
11:27:40.722 INFO [ Test worker] [ org.apache.kafka.common.metrics.Metrics, 688] - Closing reporter org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter
11:27:40.723 INFO [ Test worker] [ org.apache.kafka.common.metrics.Metrics, 694] - Metrics reporters closed
11:27:40.724 INFO [ Test worker] [ o.a.kafka.common.utils.AppInfoParser, 88] - App info kafka.consumer for consumer-test-1 unregistered
11:27:40.734 WARN [ Test worker] [ o.s.c.support.AbstractApplicationContext, 635] - Exception encountered during context initialization - cancelling refresh attempt: org.springframework.context.ApplicationContextException: Failed to start bean 'org.springframework.kafka.config.internalKafkaListenerEndpointRegistry'
java.lang.IllegalStateException: Failed to load ApplicationContext for [ReactiveWebMergedContextConfiguration@213229a5 testClass = de.ingest.util.kafka.KafkaErrorRetryTest, locations = [], classes = [de.ingest.util.kafka.KafkaErrorRetryReceiver, de.ingest.config.KafkaTestsConfig, de.ingest.rx.ReactorSchedulerNewThreadConfig], contextInitializerClasses = [], activeProfiles = ["test"], propertySourceDescriptors = [], propertySourceProperties = ["org.springframework.boot.test.context.SpringBootTestContextBootstrapper=true"], contextCustomizers = [org.springframework.boot.testcontainers.service.connection.ServiceConnectionContextCustomizer@0, org.springframework.boot.test.autoconfigure.OnFailureConditionReportContextCustomizerFactory$OnFailureConditionReportContextCustomizer@41e9f86, org.springframework.boot.test.autoconfigure.actuate.observability.ObservabilityContextCustomizerFactory$DisableObservabilityContextCustomizer@1f, org.springframework.boot.test.autoconfigure.properties.PropertyMappingContextCustomizer@0, org.springframework.boot.test.autoconfigure.web.servlet.WebDriverContextCustomizer@2416c658, org.springframework.boot.test.context.filter.ExcludeFilterContextCustomizer@1e6060f1, org.springframework.boot.test.json.DuplicateJsonObjectContextCustomizerFactory$DuplicateJsonObjectContextCustomizer@478fb7dc, org.springframework.boot.test.mock.mockito.MockitoContextCustomizer@0, org.springframework.boot.test.web.client.TestRestTemplateContextCustomizer@2a8dd942, org.springframework.boot.test.web.reactive.server.WebTestClientContextCustomizer@372f0a99, org.springframework.boot.test.web.reactor.netty.DisableReactorResourceFactoryGlobalResourcesContextCustomizerFactory$DisableReactorResourceFactoryGlobalResourcesContextCustomizerCustomizer@640d604, org.springframework.test.context.support.DynamicPropertiesContextCustomizer@0, org.springframework.boot.test.context.SpringBootTestAnnotation@f4fcfb5d], contextLoader = org.springframework.boot.test.context.SpringBootContextLoader, parent = null]
at org.springframework.test.context.cache.DefaultCacheAwareContextLoaderDelegate.loadContext(DefaultCacheAwareContextLoaderDelegate.java:180)
at org.springframework.test.context.support.DefaultTestContext.getApplicationContext(DefaultTestContext.java:130)
at org.springframework.test.context.support.DependencyInjectionTestExecutionListener.injectDependencies(DependencyInjectionTestExecutionListener.java:142)
at org.springframework.test.context.support.DependencyInjectionTestExecutionListener.prepareTestInstance(DependencyInjectionTestExecutionListener.java:98)
at org.springframework.test.context.TestContextManager.prepareTestInstance(TestContextManager.java:260)
at org.springframework.test.context.junit.jupiter.SpringExtension.postProcessTestInstance(SpringExtension.java:160)
at java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184)
at java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:197)
at java.base/java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:179)
at java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:197)
at java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1708)
at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509)
at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499)
at java.base/java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:151)
at java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:174)
at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at java.base/java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:596)
at java.base/java.util.Optional.orElseGet(Optional.java:364)
at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
Caused by: org.springframework.context.ApplicationContextException: Failed to start bean 'org.springframework.kafka.config.internalKafkaListenerEndpointRegistry'
at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:326)
at org.springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup.start(DefaultLifecycleProcessor.java:510)
at java.base/java.lang.Iterable.forEach(Iterable.java:75)
at org.springframework.context.support.DefaultLifecycleProcessor.startBeans(DefaultLifecycleProcessor.java:295)
at org.springframework.context.support.DefaultLifecycleProcessor.onRefresh(DefaultLifecycleProcessor.java:240)
at org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:1006)
at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:630)
at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:752)
at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:439)
at org.springframework.boot.SpringApplication.run(SpringApplication.java:318)
at org.springframework.boot.test.context.SpringBootContextLoader.lambda$loadContext$3(SpringBootContextLoader.java:137)
at org.springframework.util.function.ThrowingSupplier.get(ThrowingSupplier.java:58)
at org.springframework.util.function.ThrowingSupplier.get(ThrowingSupplier.java:46)
at org.springframework.boot.SpringApplication.withHook(SpringApplication.java:1461)
at org.springframework.boot.test.context.SpringBootContextLoader$ContextLoaderHook.run(SpringBootContextLoader.java:553)
at org.springframework.boot.test.context.SpringBootContextLoader.loadContext(SpringBootContextLoader.java:137)
at org.springframework.boot.test.context.SpringBootContextLoader.loadContext(SpringBootContextLoader.java:108)
at org.springframework.test.context.cache.DefaultCacheAwareContextLoaderDelegate.loadContextInternal(DefaultCacheAwareContextLoaderDelegate.java:225)
at org.springframework.test.context.cache.DefaultCacheAwareContextLoaderDelegate.loadContext(DefaultCacheAwareContextLoaderDelegate.java:152)
... 19 more
Caused by: org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
at org.apache.kafka.clients.consumer.internals.LegacyKafkaConsumer.(LegacyKafkaConsumer.java:265)
at org.apache.kafka.clients.consumer.internals.ConsumerDelegateCreator.create(ConsumerDelegateCreator.java:65)
at org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:600)
at org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:595)
at org.springframework.kafka.core.DefaultKafkaConsumerFactory$ExtendedKafkaConsumer.(DefaultKafkaConsumerFactory.java:498)
at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createRawConsumer(DefaultKafkaConsumerFactory.java:453)
at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createKafkaConsumer(DefaultKafkaConsumerFactory.java:430)
at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createConsumerWithAdjustedProperties(DefaultKafkaConsumerFactory.java:407)
at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createKafkaConsumer(DefaultKafkaConsumerFactory.java:374)
at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createConsumer(DefaultKafkaConsumerFactory.java:335)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.(KafkaMessageListenerContainer.java:875)
at org.springframework.kafka.listener.KafkaMessageListenerContainer.doStart(KafkaMessageListenerContainer.java:386)
at org.springframework.kafka.listener.AbstractMessageListenerContainer.start(AbstractMessageListenerContainer.java:520)
at org.springframework.kafka.listener.ConcurrentMessageListenerContainer.doStart(ConcurrentMessageListenerContainer.java:264)
at org.springframework.kafka.listener.AbstractMessageListenerContainer.start(AbstractMessageListenerContainer.java:520)
at org.springframework.kafka.config.KafkaListenerEndpointRegistry.startIfNecessary(KafkaListenerEndpointRegistry.java:436)
at org.springframework.kafka.config.KafkaListenerEndpointRegistry.start(KafkaListenerEndpointRegistry.java:382)
at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:323)
... 37 more
Caused by: org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata
Unten ist die Klasse, die den Listener enthält.
Code: Select all
@Service
public class KafkaErrorRetryReceiver {
public static final String LISTENER_ID ="test";
public static final String TEST_TOPIC ="orders";
private final Logger LOGGER = LoggerFactory.getLogger(this.getClass());
private CountDownLatch receivedLatch = new CountDownLatch(3);
private CountDownLatch succeededLatch = new CountDownLatch(3);
private CountDownLatch errorLatch = new CountDownLatch(1); // throw error only once - recover on 2nd attempt
private ConcurrentLinkedQueue successfulRecords = new ConcurrentLinkedQueue();
@Autowired
private ProjectReactorConsumerUtil projectReactorConsumerUtil;
@KafkaListener(id = LISTENER_ID, topics = TEST_TOPIC, containerFactory = "kafkaListenerContainerFactory")
public void topicListener(ConsumerRecord consumerRecord, Acknowledgment ack) {
projectReactorConsumerUtil.acknowledgeCompletableWhenDone(LOGGER, ack, consumerRecord, this::receive);
}
public Mono receive(GenericRecord record) {
LOGGER.info("received payload='{}'", record.toString());
receivedLatch.countDown();
if (record.toString().contains("fail") && errorLatch.getCount() > 0) {
errorLatch.countDown();
return Mono.error(() -> {
RuntimeException e = new RuntimeException("simulated consumer failure");
e.setStackTrace(new StackTraceElement[0]);
return e;
});
}
successfulRecords.add(record);
succeededLatch.countDown();
return Mono.empty();
}
public CountDownLatch getReceivedLatch() {
return receivedLatch;
}
public CountDownLatch getSucceededLatch() {
return succeededLatch;
}
public CountDownLatch getErrorLatch() {
return errorLatch;
}
public List getSuccessfulRecords() {
return ImmutableList.copyOf(successfulRecords);
}
}
Unten ist kafkaListenerContainerFactory
Code: Select all
@EnableKafka
@Configuration
public class KafkaConsumerConfig {
public static final String RETRYING_CONTAINER_FACTORY = "retryingKafkaListenerContainerFactory";
private static final int BACKOFF_MAX_INTERVAL_MS = 7500; // must be lees than session timeout
/**
* Creates a ConcurrentKafkaListenerContainerFactory bean.
* @param configurer the configurer
* @param kafkaConsumerFactory the kafka consumer factory
* @return the concurrent kafka listener container factory
* */
@Bean
@Primary
public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory(ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
ConsumerFactory kafkaConsumerFactory,
ServiceProperties serviceProperties) {
ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
configurer.configure(factory, kafkaConsumerFactory);
return factory;
}
}
Unten ist der Test:
Code: Select all
@ContextConfiguration(classes = {KafkaErrorRetryReceiver.class, KafkaTestsConfig.class, ReactorSchedulerNewThreadConfig.class})
@ActiveProfiles("test")
@SpringBootTest
@Testcontainers
public class KafkaErrorRetryTest {
public static final String REC_KEY = "msg_key";
public static final String TOPIC_NAME = KafkaErrorRetryReceiver.TEST_TOPIC;
@Autowired
private KafkaTemplate producer;
@Autowired
private KafkaErrorRetryReceiver receiver;
public void tearDown() {
KafkaTestsConfig.deleteTopic(TOPIC_NAME);
}
@Test
public void testReceive() throws Exception {
var createTime = new AtomicLong(System.currentTimeMillis());
producer.send(TOPIC_NAME, 0, createTime.incrementAndGet(), REC_KEY, KafkaTestDataUtil.testMessageWithValue("ok1")).get();
producer.send(TOPIC_NAME, 1, createTime.incrementAndGet(), REC_KEY, KafkaTestDataUtil.testMessageWithValue("fail")).get();
producer.send(TOPIC_NAME, 2, createTime.incrementAndGet(), REC_KEY, KafkaTestDataUtil.testMessageWithValue("ok2")).get();
receiver.getReceivedLatch().await(10000, TimeUnit.MILLISECONDS);
receiver.getSucceededLatch().await(10000, TimeUnit.MILLISECONDS);
assertEquals(0, receiver.getReceivedLatch().getCount());
assertEquals(0, receiver.getErrorLatch().getCount());
assertEquals(3, receiver.getSuccessfulRecords().size());
}
}
Unten sind die verwendeten application-test.properties aufgeführt:
Code: Select all
# Kafka config
service.kafka.topiccreator-autostart=true
service.kafka.partitions=3
service.kafka.replication-factor=1
spring.kafka.admin.security.protocol=PLAINTEXT
#logging.level.org.apache.kafka.clients=TRACE
#logging.level.org.springframework.kafka=TRACE
# kafka streams TopicAutoCreator - uncomment it when deciding upon increasing the replication factor for kafka streams
spring.kafka.streams.replication-factor=1
spring.kafka.streams.state-store-cache-max-size=0KB
spring.kafka.streams.cleanup.on-startup=true
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.enable-auto-commit=false
spring.kafka.bootstrap-servers=localhost:9092
Das Entfernen des ContainerFactory-Parameters aus der KafkaListener-Annotation funktioniert alles wie erwartet.
Das Upgrade von SpringBoot 2.5.12 auf 3.4.0 hatte zur Folge, dass KafkaListener die von containerFactory angegebene Bean nicht auswählen konnte und der folgende Fehler ausgegeben wurde.
[code]11:27:40.337 INFO [roducer-network-thread | consumer-test-1] [ org.apache.kafka.clients.NetworkClient, 1017] - [Producer clientId=consumer-test-1] Node -1 disconnected.
11:27:40.338 WARN [roducer-network-thread | consumer-test-1] [ org.apache.kafka.clients.NetworkClient, 849] - [Producer clientId=consumer-test-1] Connection to node -1 (localhost/127.0.0.1:9092) could not be established. Node may not be available.
11:27:40.338 WARN [roducer-network-thread | consumer-test-1] [ o.a.k.c.NetworkClient$DefaultMetadataUpdater, 1173] - [Producer clientId=consumer-test-1] Bootstrap broker localhost:9092 (id: -1 rack: null) disconnected
11:27:40.720 INFO [ Test worker] [ org.apache.kafka.common.metrics.Metrics, 684] - Metrics scheduler closed
11:27:40.722 INFO [ Test worker] [ org.apache.kafka.common.metrics.Metrics, 688] - Closing reporter org.apache.kafka.common.metrics.JmxReporter
11:27:40.722 INFO [ Test worker] [ org.apache.kafka.common.metrics.Metrics, 688] - Closing reporter org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter
11:27:40.723 INFO [ Test worker] [ org.apache.kafka.common.metrics.Metrics, 694] - Metrics reporters closed
11:27:40.724 INFO [ Test worker] [ o.a.kafka.common.utils.AppInfoParser, 88] - App info kafka.consumer for consumer-test-1 unregistered
11:27:40.734 WARN [ Test worker] [ o.s.c.support.AbstractApplicationContext, 635] - Exception encountered during context initialization - cancelling refresh attempt: org.springframework.context.ApplicationContextException: Failed to start bean 'org.springframework.kafka.config.internalKafkaListenerEndpointRegistry'
java.lang.IllegalStateException: Failed to load ApplicationContext for [ReactiveWebMergedContextConfiguration@213229a5 testClass = de.ingest.util.kafka.KafkaErrorRetryTest, locations = [], classes = [de.ingest.util.kafka.KafkaErrorRetryReceiver, de.ingest.config.KafkaTestsConfig, de.ingest.rx.ReactorSchedulerNewThreadConfig], contextInitializerClasses = [], activeProfiles = ["test"], propertySourceDescriptors = [], propertySourceProperties = ["org.springframework.boot.test.context.SpringBootTestContextBootstrapper=true"], contextCustomizers = [org.springframework.boot.testcontainers.service.connection.ServiceConnectionContextCustomizer@0, org.springframework.boot.test.autoconfigure.OnFailureConditionReportContextCustomizerFactory$OnFailureConditionReportContextCustomizer@41e9f86, org.springframework.boot.test.autoconfigure.actuate.observability.ObservabilityContextCustomizerFactory$DisableObservabilityContextCustomizer@1f, org.springframework.boot.test.autoconfigure.properties.PropertyMappingContextCustomizer@0, org.springframework.boot.test.autoconfigure.web.servlet.WebDriverContextCustomizer@2416c658, org.springframework.boot.test.context.filter.ExcludeFilterContextCustomizer@1e6060f1, org.springframework.boot.test.json.DuplicateJsonObjectContextCustomizerFactory$DuplicateJsonObjectContextCustomizer@478fb7dc, org.springframework.boot.test.mock.mockito.MockitoContextCustomizer@0, org.springframework.boot.test.web.client.TestRestTemplateContextCustomizer@2a8dd942, org.springframework.boot.test.web.reactive.server.WebTestClientContextCustomizer@372f0a99, org.springframework.boot.test.web.reactor.netty.DisableReactorResourceFactoryGlobalResourcesContextCustomizerFactory$DisableReactorResourceFactoryGlobalResourcesContextCustomizerCustomizer@640d604, org.springframework.test.context.support.DynamicPropertiesContextCustomizer@0, org.springframework.boot.test.context.SpringBootTestAnnotation@f4fcfb5d], contextLoader = org.springframework.boot.test.context.SpringBootContextLoader, parent = null]
at org.springframework.test.context.cache.DefaultCacheAwareContextLoaderDelegate.loadContext(DefaultCacheAwareContextLoaderDelegate.java:180)
at org.springframework.test.context.support.DefaultTestContext.getApplicationContext(DefaultTestContext.java:130)
at org.springframework.test.context.support.DependencyInjectionTestExecutionListener.injectDependencies(DependencyInjectionTestExecutionListener.java:142)
at org.springframework.test.context.support.DependencyInjectionTestExecutionListener.prepareTestInstance(DependencyInjectionTestExecutionListener.java:98)
at org.springframework.test.context.TestContextManager.prepareTestInstance(TestContextManager.java:260)
at org.springframework.test.context.junit.jupiter.SpringExtension.postProcessTestInstance(SpringExtension.java:160)
at java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184)
at java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:197)
at java.base/java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:179)
at java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:197)
at java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1708)
at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509)
at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499)
at java.base/java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:151)
at java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:174)
at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at java.base/java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:596)
at java.base/java.util.Optional.orElseGet(Optional.java:364)
at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
Caused by: org.springframework.context.ApplicationContextException: Failed to start bean 'org.springframework.kafka.config.internalKafkaListenerEndpointRegistry'
at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:326)
at org.springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup.start(DefaultLifecycleProcessor.java:510)
at java.base/java.lang.Iterable.forEach(Iterable.java:75)
at org.springframework.context.support.DefaultLifecycleProcessor.startBeans(DefaultLifecycleProcessor.java:295)
at org.springframework.context.support.DefaultLifecycleProcessor.onRefresh(DefaultLifecycleProcessor.java:240)
at org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:1006)
at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:630)
at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:752)
at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:439)
at org.springframework.boot.SpringApplication.run(SpringApplication.java:318)
at org.springframework.boot.test.context.SpringBootContextLoader.lambda$loadContext$3(SpringBootContextLoader.java:137)
at org.springframework.util.function.ThrowingSupplier.get(ThrowingSupplier.java:58)
at org.springframework.util.function.ThrowingSupplier.get(ThrowingSupplier.java:46)
at org.springframework.boot.SpringApplication.withHook(SpringApplication.java:1461)
at org.springframework.boot.test.context.SpringBootContextLoader$ContextLoaderHook.run(SpringBootContextLoader.java:553)
at org.springframework.boot.test.context.SpringBootContextLoader.loadContext(SpringBootContextLoader.java:137)
at org.springframework.boot.test.context.SpringBootContextLoader.loadContext(SpringBootContextLoader.java:108)
at org.springframework.test.context.cache.DefaultCacheAwareContextLoaderDelegate.loadContextInternal(DefaultCacheAwareContextLoaderDelegate.java:225)
at org.springframework.test.context.cache.DefaultCacheAwareContextLoaderDelegate.loadContext(DefaultCacheAwareContextLoaderDelegate.java:152)
... 19 more
Caused by: org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
at org.apache.kafka.clients.consumer.internals.LegacyKafkaConsumer.(LegacyKafkaConsumer.java:265)
at org.apache.kafka.clients.consumer.internals.ConsumerDelegateCreator.create(ConsumerDelegateCreator.java:65)
at org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:600)
at org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:595)
at org.springframework.kafka.core.DefaultKafkaConsumerFactory$ExtendedKafkaConsumer.(DefaultKafkaConsumerFactory.java:498)
at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createRawConsumer(DefaultKafkaConsumerFactory.java:453)
at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createKafkaConsumer(DefaultKafkaConsumerFactory.java:430)
at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createConsumerWithAdjustedProperties(DefaultKafkaConsumerFactory.java:407)
at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createKafkaConsumer(DefaultKafkaConsumerFactory.java:374)
at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createConsumer(DefaultKafkaConsumerFactory.java:335)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.(KafkaMessageListenerContainer.java:875)
at org.springframework.kafka.listener.KafkaMessageListenerContainer.doStart(KafkaMessageListenerContainer.java:386)
at org.springframework.kafka.listener.AbstractMessageListenerContainer.start(AbstractMessageListenerContainer.java:520)
at org.springframework.kafka.listener.ConcurrentMessageListenerContainer.doStart(ConcurrentMessageListenerContainer.java:264)
at org.springframework.kafka.listener.AbstractMessageListenerContainer.start(AbstractMessageListenerContainer.java:520)
at org.springframework.kafka.config.KafkaListenerEndpointRegistry.startIfNecessary(KafkaListenerEndpointRegistry.java:436)
at org.springframework.kafka.config.KafkaListenerEndpointRegistry.start(KafkaListenerEndpointRegistry.java:382)
at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:323)
... 37 more
Caused by: org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata
[/code]
Unten ist die Klasse, die den Listener enthält.
[code]@Service
public class KafkaErrorRetryReceiver {
public static final String LISTENER_ID ="test";
public static final String TEST_TOPIC ="orders";
private final Logger LOGGER = LoggerFactory.getLogger(this.getClass());
private CountDownLatch receivedLatch = new CountDownLatch(3);
private CountDownLatch succeededLatch = new CountDownLatch(3);
private CountDownLatch errorLatch = new CountDownLatch(1); // throw error only once - recover on 2nd attempt
private ConcurrentLinkedQueue successfulRecords = new ConcurrentLinkedQueue();
@Autowired
private ProjectReactorConsumerUtil projectReactorConsumerUtil;
@KafkaListener(id = LISTENER_ID, topics = TEST_TOPIC, containerFactory = "kafkaListenerContainerFactory")
public void topicListener(ConsumerRecord consumerRecord, Acknowledgment ack) {
projectReactorConsumerUtil.acknowledgeCompletableWhenDone(LOGGER, ack, consumerRecord, this::receive);
}
public Mono receive(GenericRecord record) {
LOGGER.info("received payload='{}'", record.toString());
receivedLatch.countDown();
if (record.toString().contains("fail") && errorLatch.getCount() > 0) {
errorLatch.countDown();
return Mono.error(() -> {
RuntimeException e = new RuntimeException("simulated consumer failure");
e.setStackTrace(new StackTraceElement[0]);
return e;
});
}
successfulRecords.add(record);
succeededLatch.countDown();
return Mono.empty();
}
public CountDownLatch getReceivedLatch() {
return receivedLatch;
}
public CountDownLatch getSucceededLatch() {
return succeededLatch;
}
public CountDownLatch getErrorLatch() {
return errorLatch;
}
public List getSuccessfulRecords() {
return ImmutableList.copyOf(successfulRecords);
}
}
[/code]
Unten ist kafkaListenerContainerFactory
[code]@EnableKafka
@Configuration
public class KafkaConsumerConfig {
public static final String RETRYING_CONTAINER_FACTORY = "retryingKafkaListenerContainerFactory";
private static final int BACKOFF_MAX_INTERVAL_MS = 7500; // must be lees than session timeout
/**
* Creates a ConcurrentKafkaListenerContainerFactory bean.
* @param configurer the configurer
* @param kafkaConsumerFactory the kafka consumer factory
* @return the concurrent kafka listener container factory
* */
@Bean
@Primary
public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory(ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
ConsumerFactory kafkaConsumerFactory,
ServiceProperties serviceProperties) {
ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
configurer.configure(factory, kafkaConsumerFactory);
return factory;
}
}
[/code]
Unten ist der Test:
[code]@ContextConfiguration(classes = {KafkaErrorRetryReceiver.class, KafkaTestsConfig.class, ReactorSchedulerNewThreadConfig.class})
@ActiveProfiles("test")
@SpringBootTest
@Testcontainers
public class KafkaErrorRetryTest {
public static final String REC_KEY = "msg_key";
public static final String TOPIC_NAME = KafkaErrorRetryReceiver.TEST_TOPIC;
@Autowired
private KafkaTemplate producer;
@Autowired
private KafkaErrorRetryReceiver receiver;
public void tearDown() {
KafkaTestsConfig.deleteTopic(TOPIC_NAME);
}
@Test
public void testReceive() throws Exception {
var createTime = new AtomicLong(System.currentTimeMillis());
producer.send(TOPIC_NAME, 0, createTime.incrementAndGet(), REC_KEY, KafkaTestDataUtil.testMessageWithValue("ok1")).get();
producer.send(TOPIC_NAME, 1, createTime.incrementAndGet(), REC_KEY, KafkaTestDataUtil.testMessageWithValue("fail")).get();
producer.send(TOPIC_NAME, 2, createTime.incrementAndGet(), REC_KEY, KafkaTestDataUtil.testMessageWithValue("ok2")).get();
receiver.getReceivedLatch().await(10000, TimeUnit.MILLISECONDS);
receiver.getSucceededLatch().await(10000, TimeUnit.MILLISECONDS);
assertEquals(0, receiver.getReceivedLatch().getCount());
assertEquals(0, receiver.getErrorLatch().getCount());
assertEquals(3, receiver.getSuccessfulRecords().size());
}
}
[/code]
Unten sind die verwendeten application-test.properties aufgeführt:
[code]# Kafka config
service.kafka.topiccreator-autostart=true
service.kafka.partitions=3
service.kafka.replication-factor=1
spring.kafka.admin.security.protocol=PLAINTEXT
#logging.level.org.apache.kafka.clients=TRACE
#logging.level.org.springframework.kafka=TRACE
# kafka streams TopicAutoCreator - uncomment it when deciding upon increasing the replication factor for kafka streams
spring.kafka.streams.replication-factor=1
spring.kafka.streams.state-store-cache-max-size=0KB
spring.kafka.streams.cleanup.on-startup=true
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.enable-auto-commit=false
spring.kafka.bootstrap-servers=localhost:9092
[/code]
Das Entfernen des ContainerFactory-Parameters aus der KafkaListener-Annotation funktioniert alles wie erwartet.