ContainerFactory in KafkaListener funktioniert nach dem Upgrade von 2.5 auf Spring Boot 3.4 nicht

Post a reply

Smilies
:) :( :oops: :chelo: :roll: :wink: :muza: :sorry: :angel: :read: *x) :clever:
View more smilies

BBCode is ON
[img] is ON
[flash] is OFF
[url] is ON
Smilies are ON

Topic review
   

Expand view Topic review: ContainerFactory in KafkaListener funktioniert nach dem Upgrade von 2.5 auf Spring Boot 3.4 nicht

by Guest » 31 Dec 2024, 15:56

Das Upgrade von SpringBoot 2.5.12 auf 3.4.0 hatte zur Folge, dass KafkaListener die von containerFactory angegebene Bean nicht auswählen konnte und der folgende Fehler ausgegeben wurde.

Code: Select all

11:27:40.337 INFO  [roducer-network-thread | consumer-test-1] [            org.apache.kafka.clients.NetworkClient, 1017] - [Producer clientId=consumer-test-1] Node -1 disconnected.
11:27:40.338 WARN  [roducer-network-thread | consumer-test-1] [            org.apache.kafka.clients.NetworkClient,  849] - [Producer clientId=consumer-test-1] Connection to node -1 (localhost/127.0.0.1:9092) could not be established.  Node may not be available.
11:27:40.338 WARN  [roducer-network-thread | consumer-test-1] [      o.a.k.c.NetworkClient$DefaultMetadataUpdater, 1173] - [Producer clientId=consumer-test-1] Bootstrap broker localhost:9092 (id: -1 rack: null) disconnected
11:27:40.720 INFO  [                             Test worker] [           org.apache.kafka.common.metrics.Metrics,  684] - Metrics scheduler closed
11:27:40.722 INFO  [                             Test worker] [           org.apache.kafka.common.metrics.Metrics,  688] - Closing reporter org.apache.kafka.common.metrics.JmxReporter
11:27:40.722 INFO  [                             Test worker] [           org.apache.kafka.common.metrics.Metrics,  688] - Closing reporter org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter
11:27:40.723 INFO  [                             Test worker] [           org.apache.kafka.common.metrics.Metrics,  694] - Metrics reporters closed
11:27:40.724 INFO  [                             Test worker] [              o.a.kafka.common.utils.AppInfoParser,   88] - App info kafka.consumer for consumer-test-1 unregistered
11:27:40.734 WARN  [                             Test worker] [          o.s.c.support.AbstractApplicationContext,  635] - Exception encountered during context initialization - cancelling refresh attempt: org.springframework.context.ApplicationContextException: Failed to start bean 'org.springframework.kafka.config.internalKafkaListenerEndpointRegistry'

java.lang.IllegalStateException:  Failed to load ApplicationContext for [ReactiveWebMergedContextConfiguration@213229a5 testClass = de.ingest.util.kafka.KafkaErrorRetryTest, locations = [], classes = [de.ingest.util.kafka.KafkaErrorRetryReceiver, de.ingest.config.KafkaTestsConfig, de.ingest.rx.ReactorSchedulerNewThreadConfig], contextInitializerClasses = [], activeProfiles = ["test"], propertySourceDescriptors = [], propertySourceProperties = ["org.springframework.boot.test.context.SpringBootTestContextBootstrapper=true"], contextCustomizers = [org.springframework.boot.testcontainers.service.connection.ServiceConnectionContextCustomizer@0, org.springframework.boot.test.autoconfigure.OnFailureConditionReportContextCustomizerFactory$OnFailureConditionReportContextCustomizer@41e9f86, org.springframework.boot.test.autoconfigure.actuate.observability.ObservabilityContextCustomizerFactory$DisableObservabilityContextCustomizer@1f, org.springframework.boot.test.autoconfigure.properties.PropertyMappingContextCustomizer@0, org.springframework.boot.test.autoconfigure.web.servlet.WebDriverContextCustomizer@2416c658, org.springframework.boot.test.context.filter.ExcludeFilterContextCustomizer@1e6060f1, org.springframework.boot.test.json.DuplicateJsonObjectContextCustomizerFactory$DuplicateJsonObjectContextCustomizer@478fb7dc, org.springframework.boot.test.mock.mockito.MockitoContextCustomizer@0, org.springframework.boot.test.web.client.TestRestTemplateContextCustomizer@2a8dd942, org.springframework.boot.test.web.reactive.server.WebTestClientContextCustomizer@372f0a99, org.springframework.boot.test.web.reactor.netty.DisableReactorResourceFactoryGlobalResourcesContextCustomizerFactory$DisableReactorResourceFactoryGlobalResourcesContextCustomizerCustomizer@640d604, org.springframework.test.context.support.DynamicPropertiesContextCustomizer@0, org.springframework.boot.test.context.SpringBootTestAnnotation@f4fcfb5d], contextLoader = org.springframework.boot.test.context.SpringBootContextLoader, parent = null]
at org.springframework.test.context.cache.DefaultCacheAwareContextLoaderDelegate.loadContext(DefaultCacheAwareContextLoaderDelegate.java:180)
at org.springframework.test.context.support.DefaultTestContext.getApplicationContext(DefaultTestContext.java:130)
at org.springframework.test.context.support.DependencyInjectionTestExecutionListener.injectDependencies(DependencyInjectionTestExecutionListener.java:142)
at org.springframework.test.context.support.DependencyInjectionTestExecutionListener.prepareTestInstance(DependencyInjectionTestExecutionListener.java:98)
at org.springframework.test.context.TestContextManager.prepareTestInstance(TestContextManager.java:260)
at org.springframework.test.context.junit.jupiter.SpringExtension.postProcessTestInstance(SpringExtension.java:160)
at java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184)
at java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:197)
at java.base/java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:179)
at java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:197)
at java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1708)
at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509)
at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499)
at java.base/java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:151)
at java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:174)
at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at java.base/java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:596)
at java.base/java.util.Optional.orElseGet(Optional.java:364)
at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
Caused by: org.springframework.context.ApplicationContextException:  Failed to start bean 'org.springframework.kafka.config.internalKafkaListenerEndpointRegistry'
at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:326)
at org.springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup.start(DefaultLifecycleProcessor.java:510)
at java.base/java.lang.Iterable.forEach(Iterable.java:75)
at org.springframework.context.support.DefaultLifecycleProcessor.startBeans(DefaultLifecycleProcessor.java:295)
at org.springframework.context.support.DefaultLifecycleProcessor.onRefresh(DefaultLifecycleProcessor.java:240)
at org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:1006)
at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:630)
at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:752)
at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:439)
at org.springframework.boot.SpringApplication.run(SpringApplication.java:318)
at org.springframework.boot.test.context.SpringBootContextLoader.lambda$loadContext$3(SpringBootContextLoader.java:137)
at org.springframework.util.function.ThrowingSupplier.get(ThrowingSupplier.java:58)
at org.springframework.util.function.ThrowingSupplier.get(ThrowingSupplier.java:46)
at org.springframework.boot.SpringApplication.withHook(SpringApplication.java:1461)
at org.springframework.boot.test.context.SpringBootContextLoader$ContextLoaderHook.run(SpringBootContextLoader.java:553)
at org.springframework.boot.test.context.SpringBootContextLoader.loadContext(SpringBootContextLoader.java:137)
at org.springframework.boot.test.context.SpringBootContextLoader.loadContext(SpringBootContextLoader.java:108)
at org.springframework.test.context.cache.DefaultCacheAwareContextLoaderDelegate.loadContextInternal(DefaultCacheAwareContextLoaderDelegate.java:225)
at org.springframework.test.context.cache.DefaultCacheAwareContextLoaderDelegate.loadContext(DefaultCacheAwareContextLoaderDelegate.java:152)
... 19 more
Caused by: org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
at org.apache.kafka.clients.consumer.internals.LegacyKafkaConsumer.(LegacyKafkaConsumer.java:265)
at org.apache.kafka.clients.consumer.internals.ConsumerDelegateCreator.create(ConsumerDelegateCreator.java:65)
at org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:600)
at org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:595)
at org.springframework.kafka.core.DefaultKafkaConsumerFactory$ExtendedKafkaConsumer.(DefaultKafkaConsumerFactory.java:498)
at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createRawConsumer(DefaultKafkaConsumerFactory.java:453)
at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createKafkaConsumer(DefaultKafkaConsumerFactory.java:430)
at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createConsumerWithAdjustedProperties(DefaultKafkaConsumerFactory.java:407)
at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createKafkaConsumer(DefaultKafkaConsumerFactory.java:374)
at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createConsumer(DefaultKafkaConsumerFactory.java:335)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.(KafkaMessageListenerContainer.java:875)
at org.springframework.kafka.listener.KafkaMessageListenerContainer.doStart(KafkaMessageListenerContainer.java:386)
at org.springframework.kafka.listener.AbstractMessageListenerContainer.start(AbstractMessageListenerContainer.java:520)
at org.springframework.kafka.listener.ConcurrentMessageListenerContainer.doStart(ConcurrentMessageListenerContainer.java:264)
at org.springframework.kafka.listener.AbstractMessageListenerContainer.start(AbstractMessageListenerContainer.java:520)
at org.springframework.kafka.config.KafkaListenerEndpointRegistry.startIfNecessary(KafkaListenerEndpointRegistry.java:436)
at org.springframework.kafka.config.KafkaListenerEndpointRegistry.start(KafkaListenerEndpointRegistry.java:382)
at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:323)
...  37 more
Caused by: org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata
Unten ist die Klasse, die den Listener enthält.

Code: Select all

@Service
public class KafkaErrorRetryReceiver {

public static final String LISTENER_ID ="test";
public static final String TEST_TOPIC ="orders";
private final Logger LOGGER = LoggerFactory.getLogger(this.getClass());
private CountDownLatch receivedLatch = new CountDownLatch(3);
private CountDownLatch succeededLatch = new CountDownLatch(3);
private CountDownLatch errorLatch = new CountDownLatch(1); // throw error only once - recover on 2nd attempt
private ConcurrentLinkedQueue successfulRecords = new ConcurrentLinkedQueue();

@Autowired
private ProjectReactorConsumerUtil projectReactorConsumerUtil;

@KafkaListener(id = LISTENER_ID, topics = TEST_TOPIC, containerFactory = "kafkaListenerContainerFactory")
public void topicListener(ConsumerRecord consumerRecord, Acknowledgment ack) {
projectReactorConsumerUtil.acknowledgeCompletableWhenDone(LOGGER, ack, consumerRecord, this::receive);
}

public Mono receive(GenericRecord record) {
LOGGER.info("received payload='{}'", record.toString());
receivedLatch.countDown();
if (record.toString().contains("fail") && errorLatch.getCount() > 0) {
errorLatch.countDown();
return Mono.error(() -> {
RuntimeException e = new RuntimeException("simulated consumer failure");
e.setStackTrace(new StackTraceElement[0]);
return e;
});
}
successfulRecords.add(record);
succeededLatch.countDown();
return Mono.empty();
}

public CountDownLatch getReceivedLatch() {
return receivedLatch;
}

public CountDownLatch getSucceededLatch() {
return succeededLatch;
}

public CountDownLatch getErrorLatch() {
return errorLatch;
}

public List getSuccessfulRecords() {
return ImmutableList.copyOf(successfulRecords);
}

}
Unten ist kafkaListenerContainerFactory

Code: Select all

@EnableKafka
@Configuration
public class KafkaConsumerConfig {

public static final String RETRYING_CONTAINER_FACTORY = "retryingKafkaListenerContainerFactory";
private static final int BACKOFF_MAX_INTERVAL_MS = 7500; // must be lees than session timeout

/**
* Creates a ConcurrentKafkaListenerContainerFactory bean.
* @param configurer the configurer
* @param kafkaConsumerFactory the kafka consumer factory
* @return the concurrent kafka listener container factory
* */
@Bean
@Primary
public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory(ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
ConsumerFactory kafkaConsumerFactory,
ServiceProperties serviceProperties) {
ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
configurer.configure(factory, kafkaConsumerFactory);
return factory;
}
}
Unten ist der Test:

Code: Select all

@ContextConfiguration(classes = {KafkaErrorRetryReceiver.class, KafkaTestsConfig.class, ReactorSchedulerNewThreadConfig.class})
@ActiveProfiles("test")
@SpringBootTest
@Testcontainers
public class KafkaErrorRetryTest {

public static final String REC_KEY = "msg_key";

public static final String TOPIC_NAME = KafkaErrorRetryReceiver.TEST_TOPIC;

@Autowired
private KafkaTemplate  producer;

@Autowired
private KafkaErrorRetryReceiver receiver;

public void tearDown() {
KafkaTestsConfig.deleteTopic(TOPIC_NAME);
}

@Test
public void testReceive() throws Exception {
var createTime = new AtomicLong(System.currentTimeMillis());
producer.send(TOPIC_NAME, 0, createTime.incrementAndGet(), REC_KEY, KafkaTestDataUtil.testMessageWithValue("ok1")).get();
producer.send(TOPIC_NAME, 1, createTime.incrementAndGet(), REC_KEY, KafkaTestDataUtil.testMessageWithValue("fail")).get();
producer.send(TOPIC_NAME, 2, createTime.incrementAndGet(), REC_KEY, KafkaTestDataUtil.testMessageWithValue("ok2")).get();

receiver.getReceivedLatch().await(10000, TimeUnit.MILLISECONDS);
receiver.getSucceededLatch().await(10000, TimeUnit.MILLISECONDS);
assertEquals(0, receiver.getReceivedLatch().getCount());
assertEquals(0, receiver.getErrorLatch().getCount());
assertEquals(3, receiver.getSuccessfulRecords().size());
}
}
Unten sind die verwendeten application-test.properties aufgeführt:

Code: Select all

# Kafka config
service.kafka.topiccreator-autostart=true
service.kafka.partitions=3
service.kafka.replication-factor=1
spring.kafka.admin.security.protocol=PLAINTEXT

#logging.level.org.apache.kafka.clients=TRACE
#logging.level.org.springframework.kafka=TRACE

# kafka streams TopicAutoCreator - uncomment it when deciding upon increasing the replication factor for kafka streams
spring.kafka.streams.replication-factor=1
spring.kafka.streams.state-store-cache-max-size=0KB
spring.kafka.streams.cleanup.on-startup=true

spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.enable-auto-commit=false
spring.kafka.bootstrap-servers=localhost:9092
Das Entfernen des ContainerFactory-Parameters aus der KafkaListener-Annotation funktioniert alles wie erwartet.

Top