Ich lerne, wie man Kafka mit der Feder für Apache Kafka verwendet. Zwei Kafka-Knoten, die auf meiner lokalen Maschine ausgeführt werden, Kafka-Version: 3.8.1. Ich verwende JsonSerializer.class für den Wert und den nicht blockierenden (async) Absender, da ich die Arbeit des Produzenten erst blockieren möchte, wenn er die ACKs erhält. Ich veriere mich (nur protokollieren, sonst nichts) das Ergebnis in der Zukunft. Dass ich erstellt habe, funktioniert gut, kann ich Nachrichten an das Thema Kafka senden und sehe sie mit einem Kafka-Thema-Browser-Tool (KAFKIO) richtig. Ich habe die Herstellungsversuche basierend auf mehreren Dokumenten und Artikeln konfiguriert, beispielsweise dieses DOC. Aber leider kann ich nichts im Protokoll im Zusammenhang mit dem Wiederholung sehen, und ich denke
[*] Starten Sie meine Kafka -Server < /li>
Starten Sie meine Spring -App mit dem Nachrichtenproduzenten < /li>
Senden Sie a Nur wenige Nachrichten zum Thema, es funktioniert wie ein Zauber, Nachrichten finden Sie zum Thema < /li>
Ich stoppt meine Kafka -Server (Booth) auf meinem Localhost. < /li>
< LI> Versuch, eine neue Nachricht an das Thema zu senden < /li>
Ich gehe davon aus > Ich sehe die Ausnahme einmal im Protokoll, aber ich kann keinen Wiederholung sehen: < /p>
@Bean
public ProducerFactory producerFactory() {
DefaultKafkaProducerFactory factory = new DefaultKafkaProducerFactory(producerConfiguration());
factory.setProducerPerThread(false);
return factory;
}
@Bean
public KafkaTemplate kafkaTemplate() {
var factory = producerFactory();
log.debug("initializing a KafkaTemplate using the following setting: {{}}", factoryConfigurationToString(factory));
return new KafkaTemplate(factory);
}
Ich lerne, wie man Kafka mit der Feder für Apache Kafka verwendet. Zwei Kafka-Knoten, die auf meiner lokalen Maschine ausgeführt werden, Kafka-Version: 3.8.1. Ich verwende JsonSerializer.class für den Wert und den nicht blockierenden (async) Absender, da ich die Arbeit des Produzenten erst blockieren möchte, wenn er die ACKs erhält. Ich veriere mich (nur protokollieren, sonst nichts) das Ergebnis in der Zukunft. Dass ich erstellt habe, funktioniert gut, kann ich Nachrichten an das Thema Kafka senden und sehe sie mit einem Kafka-Thema-Browser-Tool (KAFKIO) richtig. Ich habe die Herstellungsversuche basierend auf mehreren Dokumenten und Artikeln konfiguriert, beispielsweise dieses DOC. Aber leider kann ich nichts im Protokoll im Zusammenhang mit dem Wiederholung sehen, und ich denke [b] [*] Starten Sie meine Kafka -Server < /li> Starten Sie meine Spring -App mit dem Nachrichtenproduzenten < /li> Senden Sie a Nur wenige Nachrichten zum Thema, es funktioniert wie ein Zauber, Nachrichten finden Sie zum Thema < /li> Ich stoppt meine Kafka -Server (Booth) auf meinem Localhost. < /li> < LI> Versuch, eine neue Nachricht an das Thema zu senden < /li> Ich gehe davon aus > Ich sehe die Ausnahme einmal im Protokoll, aber ich kann keinen Wiederholung sehen: < /p> [code]org.springframework.kafka.core.KafkaProducerException: Failed to send ... Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for incoming-2:15000 ms has passed since batch creation ... < /code> [b] Was vermisse ich in meinem Code? Warum funktioniert der Wiederholung des Fehlers bei Fehler nicht? private Map producerConfiguration() { Map configs = new HashMap(); configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers);
@Bean public KafkaTemplate kafkaTemplate() { var factory = producerFactory(); log.debug("initializing a KafkaTemplate using the following setting: {{}}", factoryConfigurationToString(factory)); return new KafkaTemplate(factory); } [/code] [b] So erstelle ich das Thema: [/b] [code]@Bean public KafkaAdmin admin() { Map configs = new HashMap(); configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers); return new KafkaAdmin(configs); }
@Bean public NewTopic topic() { return TopicBuilder.name("incoming") .partitions(3) .replicas(2) .build(); } [/code] [b] Und schließlich sende ich die Nachricht: [/b] [code]public void onSend(Event event) { try { log.debug("sending message to kafka: {topic: \"{}\", payload: {}}", kafkaTopic, event); ProducerRecord record = new ProducerRecord(kafkaTopic, event); kafkaTemplate.send(record). whenComplete((result, ex) -> { if (ex == null) { log.info( "message successfully sent to kafka: {topic: \"{}\", partition: {}, offset: {}, key: \"{}\", value: \"{}\"}", result.getRecordMetadata().topic(), result.getRecordMetadata().partition(), result.getRecordMetadata().offset(), result.getProducerRecord().key(), result.getProducerRecord().value()); } else { var cause = ex.getCause(); var isRetryable = cause instanceof RetriableException; log.error( "failed to send message to kafka: {topic: \"{}\", event: {}, error: {}, isRetryable: {}}", kafkaTopic, event.toString(), cause.getMessage(), isRetryable, ex); }}); } catch (Throwable ex) { log.error( "Unable to send message to kafka topic: {topic: \"{}\", event: {}}", kafkaTopic, event.toString(), ex); } } [/code] [b] und Ergebnisprotokoll: [/b] [code]2025-01-31T13:19:52.183Z INFO 203 --- [gombi-kafka-producer] [ main] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat initialized with port 8443 (https) 2025-01-31T13:19:52.209Z INFO 203 --- [gombi-kafka-producer] [ main] o.apache.catalina.core.StandardService : Starting service [Tomcat] 2025-01-31T13:19:52.210Z INFO 203 --- [gombi-kafka-producer] [ main] o.apache.catalina.core.StandardEngine : Starting Servlet engine: [Apache Tomcat/10.1.34] 2025-01-31T13:19:52.271Z INFO 203 --- [gombi-kafka-producer] [ main] o.a.c.c.C.[Tomcat].[localhost].[/] : Initializing Spring embedded WebApplicationContext 2025-01-31T13:19:52.272Z INFO 203 --- [gombi-kafka-producer] [ main] w.s.c.ServletWebServerApplicationContext : Root WebApplicationContext: initialization completed in 3738 ms 2025-01-31T13:19:53.048Z DEBUG 203 --- [gombi-kafka-producer] [ main] c.r.g.s.m.p.c.KafkaConfiguration : initializing a KafkaTemplate using the following setting: {"retries": "2147483647", "enable.idempotence": "true", "retry.backoff.max.ms": "5000", "value.serializer": "class org.springframework.kafka.support.serializer.JsonSerializer", "request.timeout.ms": "10000", "acks": "all", "bootstrap.servers": "kafka-1.hello.com:9092, kafka-2.hello.com:9092", "delivery.timeout.ms": "15000", "retry.backoff.ms": "1000", "key.serializer": "class org.apache.kafka.common.serialization.StringSerializer", "linger.ms": "0"}
>
2025-01-31T13:19:53.181Z DEBUG 203 --- [gombi-kafka-producer] [ main] c.r.g.s.m.p.c.KafkaConfiguration : creating a new kafka topic: {name: "incoming", partitions: 3, replicas: 2} 2025-01-31T13:19:55.083Z WARN 203 --- [gombi-kafka-producer] [ main] iguration$LoadBalancerCaffeineWarnLogger : Spring Cloud LoadBalancer is currently working with the default cache. While this cache implementation is useful for development and tests, it's recommended to use Caffeine cache in production.You can switch to using Caffeine cache, by adding it and org.springframework.cache.caffeine.CaffeineCacheManager to the classpath. 2025-01-31T13:19:55.103Z INFO 203 --- [gombi-kafka-producer] [ main] o.s.b.a.e.web.EndpointLinksResolver : Exposing 3 endpoints beneath base path '/actuator' 2025-01-31T13:19:55.162Z INFO 203 --- [gombi-kafka-producer] [ main] o.a.k.clients.admin.AdminClientConfig : AdminClientConfig values: auto.include.jmx.reporter = true bootstrap.controllers = [] bootstrap.servers = [kafka-1.hello.com:9092, kafka-2.hello.com:9092] client.dns.lookup = use_all_dns_ips client.id = gombi-kafka-producer-admin-0 connections.max.idle.ms = 300000 default.api.timeout.ms = 60000 enable.metrics.push = true metadata.max.age.ms = 300000 metadata.recovery.strategy = none metric.reporters = [] metrics.num.samples = 2 metrics.recording.level = INFO metrics.sample.window.ms = 30000 receive.buffer.bytes = 65536 reconnect.backoff.max.ms = 1000 reconnect.backoff.ms = 50 request.timeout.ms = 30000 retries = 2147483647 retry.backoff.max.ms = 1000 retry.backoff.ms = 100 ... send.buffer.bytes = 131072 socket.connection.setup.timeout.max.ms = 30000 socket.connection.setup.timeout.ms = 10000 ssl.cipher.suites = null ...
2025-01-31T13:19:55.306Z INFO 203 --- [gombi-kafka-producer] [ main] o.a.kafka.common.utils.AppInfoParser : Kafka version: 3.8.1 2025-01-31T13:19:55.306Z INFO 203 --- [gombi-kafka-producer] [ main] o.a.kafka.common.utils.AppInfoParser : Kafka commitId: 70d6ff42debf7e17 2025-01-31T13:19:55.307Z INFO 203 --- [gombi-kafka-producer] [ main] o.a.kafka.common.utils.AppInfoParser : Kafka startTimeMs: 1738329595305 2025-01-31T13:19:55.637Z WARN 203 --- [gombi-kafka-producer] [roducer-admin-0] o.a.k.clients.admin.KafkaAdminClient : [AdminClient clientId=gombi-kafka-producer-admin-0] The DescribeTopicPartitions API is not supported, using Metadata API to describe topics. 2025-01-31T13:19:55.924Z INFO 203 --- [gombi-kafka-producer] [roducer-admin-0] o.a.kafka.common.utils.AppInfoParser : App info kafka.admin.client for gombi-kafka-producer-admin-0 unregistered 2025-01-31T13:19:55.929Z INFO 203 --- [gombi-kafka-producer] [roducer-admin-0] o.apache.kafka.common.metrics.Metrics : Metrics scheduler closed 2025-01-31T13:19:55.930Z INFO 203 --- [gombi-kafka-producer] [roducer-admin-0] o.apache.kafka.common.metrics.Metrics : Closing reporter org.apache.kafka.common.metrics.JmxReporter 2025-01-31T13:19:55.931Z INFO 203 --- [gombi-kafka-producer] [roducer-admin-0] o.apache.kafka.common.metrics.Metrics : Metrics reporters closed 2025-01-31T13:19:56.088Z INFO 203 --- [gombi-kafka-producer] [ main] o.a.t.util.net.NioEndpoint.certificate : Connector [https-jsse-nio-8443], TLS virtual host [_default_], certificate type [UNDEFINED] configured from keystore [/root/.keystore] using alias [kafka-producer-service-1.hello.com] with trust store [null] 2025-01-31T13:19:56.103Z INFO 203 --- [gombi-kafka-producer] [ main] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat started on port 8443 (https) with context path '/' 2025-01-31T13:19:56.112Z INFO 203 --- [gombi-kafka-producer] [ main] o.s.c.c.s.ConsulServiceRegistry : Registering service with consul: NewService{id='gombi-kafka-producer', name='gombi-kafka-producer', tags=[service, 0.2.0], address='kafka-producer-service-1.hello.com', meta={secure=true}, port=8443, enableTagOverride=null, check=Check{script='null', dockerContainerID='null', shell='null', interval='2s', ttl='null', http='https://kafka-producer-service-1.hello.com:8443/actuator/health', method='null', header={}, tcp='null', timeout='2s', deregisterCriticalServiceAfter='10s', tlsSkipVerify=null, status='null', grpc='null', grpcUseTLS=null}, checks=null} 2025-01-31T13:19:56.155Z INFO 203 --- [gombi-kafka-producer] [ main] c.r.g.s.message.producer.Application : Started Application in 13.263 seconds (process running for 14.104)
2025-01-31T13:20:36.805Z DEBUG 203 --- [gombi-kafka-producer] [nio-8443-exec-8] c.r.g.c.m.MethodStatisticsAspect : > calling the KafkaProducerController.sendOneMessage()... 2025-01-31T13:20:36.805Z DEBUG 203 --- [gombi-kafka-producer] [nio-8443-exec-8] c.r.g.s.m.p.s.KafkaProducerService : sending message to kafka: {topic: "incoming", payload: {sourceSystem: "payment-service", owner: "amelia", payload: "{"comment": "default message"}", createdInUtc: "2025-01-31 13:20:36.805"} 2025-01-31T13:20:36.806Z DEBUG 203 --- [gombi-kafka-producer] [nio-8443-exec-8] c.r.g.c.m.MethodStatisticsAspect : < call ended: {name: KafkaProducerController.sendOneMessage, return: "A message has been sent to the incoming[/b] Kafka topic.", execution-in-ms: 1} 2025-01-31T13:20:41.043Z INFO 203 --- [gombi-kafka-producer] [ucer-producer-1] org.apache.kafka.clients.NetworkClient : [Producer clientId=gombi-kafka-producer-producer-1] Disconnecting from node 1 due to socket connection setup timeout. The timeout value is 9314 ms. 2025-01-31T13:20:46.218Z INFO 203 --- [gombi-kafka-producer] [ucer-producer-1] org.apache.kafka.clients.NetworkClient : [Producer clientId=gombi-kafka-producer-producer-1] Disconnecting from node 2 due to socket connection setup timeout. The timeout value is 9375 ms. 2025-01-31T13:20:51.808Z ERROR 203 --- [gombi-kafka-producer] [ucer-producer-1] c.r.g.s.m.p.s.KafkaProducerService : failed to send message to kafka: {topic: "incoming", event: {sourceSystem: "payment-service", owner: "amelia", payload: "{"comment": "default message"}", createdInUtc: "2025-01-31 13:20:36.805", error: Expiring 1 record(s) for incoming-2:15000 ms has passed since batch creation, isRetryable: true}
org.springframework.kafka.core.KafkaProducerException: Failed to send at org.springframework.kafka.core.KafkaTemplate.lambda$buildCallback$9(KafkaTemplate.java:891) ~[spring-kafka-3.3.1.jar!/:3.3.1] at org.springframework.kafka.core.DefaultKafkaProducerFactory$CloseSafeProducer$1.onCompletion(DefaultKafkaProducerFactory.java:1111) ~[spring-kafka-3.3.1.jar!/:3.3.1] at org.apache.kafka.clients.producer.KafkaProducer$AppendCallbacks.onCompletion(KafkaProducer.java:1567) ~[kafka-clients-3.8.1.jar!/:na] at org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:311) ~[kafka-clients-3.8.1.jar!/:na] at org.apache.kafka.clients.producer.internals.ProducerBatch.done(ProducerBatch.java:272) ~[kafka-clients-3.8.1.jar!/:na] at org.apache.kafka.clients.producer.internals.ProducerBatch.completeExceptionally(ProducerBatch.java:236) ~[kafka-clients-3.8.1.jar!/:na] at org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:829) ~[kafka-clients-3.8.1.jar!/:na] at org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:818) ~[kafka-clients-3.8.1.jar!/:na] at org.apache.kafka.clients.producer.internals.Sender.sendProducerData(Sender.java:426) ~[kafka-clients-3.8.1.jar!/:na] at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:347) ~[kafka-clients-3.8.1.jar!/:na] at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:250) ~[kafka-clients-3.8.1.jar!/:na] at java.base/java.lang.Thread.run(Thread.java:1583) ~[na:na] Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for incoming-2:15000 ms has passed since batch creation
2025-01-31T13:20:51.811Z ERROR 203 --- [gombi-kafka-producer] [ucer-producer-1] o.s.k.support.LoggingProducerListener : Exception thrown when sending a message with key='null' and payload='{sourceSystem: "payment-service", owner: "amelia", payload: "{"comment": "default message"}", create...' to topic incoming:
org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for incoming-2:15000 ms has passed since batch creation
>
2025-01-31T13:21:08.288Z INFO 203 --- [gombi-kafka-producer] [ucer-producer-1] org.apache.kafka.clients.NetworkClient : [Producer clientId=gombi-kafka-producer-producer-1] Disconnecting from node 2 due to socket connection setup timeout. The timeout value is 21930 ms. 2025-01-31T13:21:33.112Z INFO 203 --- [gombi-kafka-producer] [ucer-producer-1] org.apache.kafka.clients.NetworkClient : [Producer clientId=gombi-kafka-producer-producer-1] Disconnecting from node 1 due to socket connection setup timeout. The timeout value is 19813 ms. 2025-01-31T13:22:02.355Z INFO 203 --- [gombi-kafka-producer] [ucer-producer-1] org.apache.kafka.clients.NetworkClient : [Producer clientId=gombi-kafka-producer-producer-1] Disconnecting from node 2 due to socket connection setup timeout. The timeout value is 24229 ms. 2025-01-31T13:22:37.368Z INFO 203 --- [gombi-kafka-producer] [ucer-producer-1] org.apache.kafka.clients.NetworkClient : [Producer clientId=gombi-kafka-producer-producer-1] Disconnecting from node 1 due to socket connection setup timeout. The timeout value is 30000 ms. ... [/code] [b] Sie können den SRC hier überprüfen: [/b]
Ich lerne, wie man Kafka mit der Feder für Apache Kafka verwendet. Zwei Kafka-Knoten, die auf meiner lokalen Maschine ausgeführt werden, Kafka-Version: 3.8.1. Ich verwende JsonSerializer.class für...
Ich habe derzeit das Problem, dass ich nach einem Artefakt in Jenkins suche. Wenn dieses Artefakt nicht gefunden werden kann, sollte ein 404 zurückkommen. Bisher funktioniert das ganz gut. Leider...
Was ich erreichen möchte:
Ich möchte die wiedergegebene Nachricht mit Spring Kafka identifizieren, wenn ich die wiedergegebene Nachricht erzeugt. /> Leider ist der Kafka -Cluster schuppig, aber...
Ich habe eine .NET -Anwendung, die die Confluent Client -Bibliothek verwendet, um Nachrichten mit einem Transaktionsproduzenten an Kafka zu senden. Es verwendet einen Pool von Produzenten, um...
Dies hängt in gewisser Weise mit meinem tatsächlichen Code zusammen. Die in Pytest Fixture verwendete Async-Kontextmanagerklasse gibt ein Async-Generatorobjekt anstelle der erwarteten...