ContainerFactory in KafkaListener funktioniert nach dem Upgrade von 2.5 auf Spring Boot 3.4 nicht
Posted: 31 Dec 2024, 15:56
Das Upgrade von SpringBoot 2.5.12 auf 3.4.0 hatte zur Folge, dass KafkaListener die von containerFactory angegebene Bean nicht auswählen konnte und der folgende Fehler ausgegeben wurde.
Unten ist die Klasse, die den Listener enthält.
Unten ist kafkaListenerContainerFactory
Unten ist der Test:
Unten sind die verwendeten application-test.properties aufgeführt:
Das Entfernen des ContainerFactory-Parameters aus der KafkaListener-Annotation funktioniert alles wie erwartet.
Code: Select all
11:27:40.337 INFO [roducer-network-thread | consumer-test-1] [ org.apache.kafka.clients.NetworkClient, 1017] - [Producer clientId=consumer-test-1] Node -1 disconnected.
11:27:40.338 WARN [roducer-network-thread | consumer-test-1] [ org.apache.kafka.clients.NetworkClient, 849] - [Producer clientId=consumer-test-1] Connection to node -1 (localhost/127.0.0.1:9092) could not be established. Node may not be available.
11:27:40.338 WARN [roducer-network-thread | consumer-test-1] [ o.a.k.c.NetworkClient$DefaultMetadataUpdater, 1173] - [Producer clientId=consumer-test-1] Bootstrap broker localhost:9092 (id: -1 rack: null) disconnected
11:27:40.720 INFO [ Test worker] [ org.apache.kafka.common.metrics.Metrics, 684] - Metrics scheduler closed
11:27:40.722 INFO [ Test worker] [ org.apache.kafka.common.metrics.Metrics, 688] - Closing reporter org.apache.kafka.common.metrics.JmxReporter
11:27:40.722 INFO [ Test worker] [ org.apache.kafka.common.metrics.Metrics, 688] - Closing reporter org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter
11:27:40.723 INFO [ Test worker] [ org.apache.kafka.common.metrics.Metrics, 694] - Metrics reporters closed
11:27:40.724 INFO [ Test worker] [ o.a.kafka.common.utils.AppInfoParser, 88] - App info kafka.consumer for consumer-test-1 unregistered
11:27:40.734 WARN [ Test worker] [ o.s.c.support.AbstractApplicationContext, 635] - Exception encountered during context initialization - cancelling refresh attempt: org.springframework.context.ApplicationContextException: Failed to start bean 'org.springframework.kafka.config.internalKafkaListenerEndpointRegistry'
java.lang.IllegalStateException: Failed to load ApplicationContext for [ReactiveWebMergedContextConfiguration@213229a5 testClass = de.ingest.util.kafka.KafkaErrorRetryTest, locations = [], classes = [de.ingest.util.kafka.KafkaErrorRetryReceiver, de.ingest.config.KafkaTestsConfig, de.ingest.rx.ReactorSchedulerNewThreadConfig], contextInitializerClasses = [], activeProfiles = ["test"], propertySourceDescriptors = [], propertySourceProperties = ["org.springframework.boot.test.context.SpringBootTestContextBootstrapper=true"], contextCustomizers = [org.springframework.boot.testcontainers.service.connection.ServiceConnectionContextCustomizer@0, org.springframework.boot.test.autoconfigure.OnFailureConditionReportContextCustomizerFactory$OnFailureConditionReportContextCustomizer@41e9f86, org.springframework.boot.test.autoconfigure.actuate.observability.ObservabilityContextCustomizerFactory$DisableObservabilityContextCustomizer@1f, org.springframework.boot.test.autoconfigure.properties.PropertyMappingContextCustomizer@0, org.springframework.boot.test.autoconfigure.web.servlet.WebDriverContextCustomizer@2416c658, org.springframework.boot.test.context.filter.ExcludeFilterContextCustomizer@1e6060f1, org.springframework.boot.test.json.DuplicateJsonObjectContextCustomizerFactory$DuplicateJsonObjectContextCustomizer@478fb7dc, org.springframework.boot.test.mock.mockito.MockitoContextCustomizer@0, org.springframework.boot.test.web.client.TestRestTemplateContextCustomizer@2a8dd942, org.springframework.boot.test.web.reactive.server.WebTestClientContextCustomizer@372f0a99, org.springframework.boot.test.web.reactor.netty.DisableReactorResourceFactoryGlobalResourcesContextCustomizerFactory$DisableReactorResourceFactoryGlobalResourcesContextCustomizerCustomizer@640d604, org.springframework.test.context.support.DynamicPropertiesContextCustomizer@0, org.springframework.boot.test.context.SpringBootTestAnnotation@f4fcfb5d], contextLoader = org.springframework.boot.test.context.SpringBootContextLoader, parent = null]
at org.springframework.test.context.cache.DefaultCacheAwareContextLoaderDelegate.loadContext(DefaultCacheAwareContextLoaderDelegate.java:180)
at org.springframework.test.context.support.DefaultTestContext.getApplicationContext(DefaultTestContext.java:130)
at org.springframework.test.context.support.DependencyInjectionTestExecutionListener.injectDependencies(DependencyInjectionTestExecutionListener.java:142)
at org.springframework.test.context.support.DependencyInjectionTestExecutionListener.prepareTestInstance(DependencyInjectionTestExecutionListener.java:98)
at org.springframework.test.context.TestContextManager.prepareTestInstance(TestContextManager.java:260)
at org.springframework.test.context.junit.jupiter.SpringExtension.postProcessTestInstance(SpringExtension.java:160)
at java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184)
at java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:197)
at java.base/java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:179)
at java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:197)
at java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1708)
at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509)
at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499)
at java.base/java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:151)
at java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:174)
at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at java.base/java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:596)
at java.base/java.util.Optional.orElseGet(Optional.java:364)
at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
Caused by: org.springframework.context.ApplicationContextException: Failed to start bean 'org.springframework.kafka.config.internalKafkaListenerEndpointRegistry'
at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:326)
at org.springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup.start(DefaultLifecycleProcessor.java:510)
at java.base/java.lang.Iterable.forEach(Iterable.java:75)
at org.springframework.context.support.DefaultLifecycleProcessor.startBeans(DefaultLifecycleProcessor.java:295)
at org.springframework.context.support.DefaultLifecycleProcessor.onRefresh(DefaultLifecycleProcessor.java:240)
at org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:1006)
at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:630)
at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:752)
at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:439)
at org.springframework.boot.SpringApplication.run(SpringApplication.java:318)
at org.springframework.boot.test.context.SpringBootContextLoader.lambda$loadContext$3(SpringBootContextLoader.java:137)
at org.springframework.util.function.ThrowingSupplier.get(ThrowingSupplier.java:58)
at org.springframework.util.function.ThrowingSupplier.get(ThrowingSupplier.java:46)
at org.springframework.boot.SpringApplication.withHook(SpringApplication.java:1461)
at org.springframework.boot.test.context.SpringBootContextLoader$ContextLoaderHook.run(SpringBootContextLoader.java:553)
at org.springframework.boot.test.context.SpringBootContextLoader.loadContext(SpringBootContextLoader.java:137)
at org.springframework.boot.test.context.SpringBootContextLoader.loadContext(SpringBootContextLoader.java:108)
at org.springframework.test.context.cache.DefaultCacheAwareContextLoaderDelegate.loadContextInternal(DefaultCacheAwareContextLoaderDelegate.java:225)
at org.springframework.test.context.cache.DefaultCacheAwareContextLoaderDelegate.loadContext(DefaultCacheAwareContextLoaderDelegate.java:152)
... 19 more
Caused by: org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
at org.apache.kafka.clients.consumer.internals.LegacyKafkaConsumer.(LegacyKafkaConsumer.java:265)
at org.apache.kafka.clients.consumer.internals.ConsumerDelegateCreator.create(ConsumerDelegateCreator.java:65)
at org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:600)
at org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:595)
at org.springframework.kafka.core.DefaultKafkaConsumerFactory$ExtendedKafkaConsumer.(DefaultKafkaConsumerFactory.java:498)
at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createRawConsumer(DefaultKafkaConsumerFactory.java:453)
at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createKafkaConsumer(DefaultKafkaConsumerFactory.java:430)
at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createConsumerWithAdjustedProperties(DefaultKafkaConsumerFactory.java:407)
at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createKafkaConsumer(DefaultKafkaConsumerFactory.java:374)
at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createConsumer(DefaultKafkaConsumerFactory.java:335)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.(KafkaMessageListenerContainer.java:875)
at org.springframework.kafka.listener.KafkaMessageListenerContainer.doStart(KafkaMessageListenerContainer.java:386)
at org.springframework.kafka.listener.AbstractMessageListenerContainer.start(AbstractMessageListenerContainer.java:520)
at org.springframework.kafka.listener.ConcurrentMessageListenerContainer.doStart(ConcurrentMessageListenerContainer.java:264)
at org.springframework.kafka.listener.AbstractMessageListenerContainer.start(AbstractMessageListenerContainer.java:520)
at org.springframework.kafka.config.KafkaListenerEndpointRegistry.startIfNecessary(KafkaListenerEndpointRegistry.java:436)
at org.springframework.kafka.config.KafkaListenerEndpointRegistry.start(KafkaListenerEndpointRegistry.java:382)
at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:323)
... 37 more
Caused by: org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata
Code: Select all
@Service
public class KafkaErrorRetryReceiver {
public static final String LISTENER_ID ="test";
public static final String TEST_TOPIC ="orders";
private final Logger LOGGER = LoggerFactory.getLogger(this.getClass());
private CountDownLatch receivedLatch = new CountDownLatch(3);
private CountDownLatch succeededLatch = new CountDownLatch(3);
private CountDownLatch errorLatch = new CountDownLatch(1); // throw error only once - recover on 2nd attempt
private ConcurrentLinkedQueue successfulRecords = new ConcurrentLinkedQueue();
@Autowired
private ProjectReactorConsumerUtil projectReactorConsumerUtil;
@KafkaListener(id = LISTENER_ID, topics = TEST_TOPIC, containerFactory = "kafkaListenerContainerFactory")
public void topicListener(ConsumerRecord consumerRecord, Acknowledgment ack) {
projectReactorConsumerUtil.acknowledgeCompletableWhenDone(LOGGER, ack, consumerRecord, this::receive);
}
public Mono receive(GenericRecord record) {
LOGGER.info("received payload='{}'", record.toString());
receivedLatch.countDown();
if (record.toString().contains("fail") && errorLatch.getCount() > 0) {
errorLatch.countDown();
return Mono.error(() -> {
RuntimeException e = new RuntimeException("simulated consumer failure");
e.setStackTrace(new StackTraceElement[0]);
return e;
});
}
successfulRecords.add(record);
succeededLatch.countDown();
return Mono.empty();
}
public CountDownLatch getReceivedLatch() {
return receivedLatch;
}
public CountDownLatch getSucceededLatch() {
return succeededLatch;
}
public CountDownLatch getErrorLatch() {
return errorLatch;
}
public List getSuccessfulRecords() {
return ImmutableList.copyOf(successfulRecords);
}
}
Code: Select all
@EnableKafka
@Configuration
public class KafkaConsumerConfig {
public static final String RETRYING_CONTAINER_FACTORY = "retryingKafkaListenerContainerFactory";
private static final int BACKOFF_MAX_INTERVAL_MS = 7500; // must be lees than session timeout
/**
* Creates a ConcurrentKafkaListenerContainerFactory bean.
* @param configurer the configurer
* @param kafkaConsumerFactory the kafka consumer factory
* @return the concurrent kafka listener container factory
* */
@Bean
@Primary
public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory(ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
ConsumerFactory kafkaConsumerFactory,
ServiceProperties serviceProperties) {
ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
configurer.configure(factory, kafkaConsumerFactory);
return factory;
}
}
Code: Select all
@ContextConfiguration(classes = {KafkaErrorRetryReceiver.class, KafkaTestsConfig.class, ReactorSchedulerNewThreadConfig.class})
@ActiveProfiles("test")
@SpringBootTest
@Testcontainers
public class KafkaErrorRetryTest {
public static final String REC_KEY = "msg_key";
public static final String TOPIC_NAME = KafkaErrorRetryReceiver.TEST_TOPIC;
@Autowired
private KafkaTemplate producer;
@Autowired
private KafkaErrorRetryReceiver receiver;
public void tearDown() {
KafkaTestsConfig.deleteTopic(TOPIC_NAME);
}
@Test
public void testReceive() throws Exception {
var createTime = new AtomicLong(System.currentTimeMillis());
producer.send(TOPIC_NAME, 0, createTime.incrementAndGet(), REC_KEY, KafkaTestDataUtil.testMessageWithValue("ok1")).get();
producer.send(TOPIC_NAME, 1, createTime.incrementAndGet(), REC_KEY, KafkaTestDataUtil.testMessageWithValue("fail")).get();
producer.send(TOPIC_NAME, 2, createTime.incrementAndGet(), REC_KEY, KafkaTestDataUtil.testMessageWithValue("ok2")).get();
receiver.getReceivedLatch().await(10000, TimeUnit.MILLISECONDS);
receiver.getSucceededLatch().await(10000, TimeUnit.MILLISECONDS);
assertEquals(0, receiver.getReceivedLatch().getCount());
assertEquals(0, receiver.getErrorLatch().getCount());
assertEquals(3, receiver.getSuccessfulRecords().size());
}
}
Code: Select all
# Kafka config
service.kafka.topiccreator-autostart=true
service.kafka.partitions=3
service.kafka.replication-factor=1
spring.kafka.admin.security.protocol=PLAINTEXT
#logging.level.org.apache.kafka.clients=TRACE
#logging.level.org.springframework.kafka=TRACE
# kafka streams TopicAutoCreator - uncomment it when deciding upon increasing the replication factor for kafka streams
spring.kafka.streams.replication-factor=1
spring.kafka.streams.state-store-cache-max-size=0KB
spring.kafka.streams.cleanup.on-startup=true
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.enable-auto-commit=false
spring.kafka.bootstrap-servers=localhost:9092