Wie kann ich das Problem beheben, dass org.apache.kafka.clients.NetworkClient Knoten -1 nicht verbunden ist?Java

Java-Forum
Guest
 Wie kann ich das Problem beheben, dass org.apache.kafka.clients.NetworkClient Knoten -1 nicht verbunden ist?

Post by Guest »

Hier geht es um den Kafka-Hörer. Wir haben keinen Zugriff auf den Herausgeber (es ist ein Drittanbieter).
Genau 9 Minuten nach dem Start der App erhalten wir Folgendes in unseren Spring Boot-Anwendungsprotokollen:

Code: Select all

org.apache.kafka.clients.NetworkClient [] => [Consumer clientId=consumer-xx, groupId=xx] Node -1 disconnected.
Interessanterweise steht im Konfigurations-Dump, den Kafka beim Start erstellt, „connections.max.idle.ms = 540000“. Das sind 9 Minuten. Aber nur weil 9 Minuten lang keine Nachrichten eingegangen sind, sollte die App nicht kaputt gehen und der Listener gestoppt werden!
Das Seltsame ist, obwohl wir die Fehlermeldung erhalten, dass der Verbraucher die Verbindung getrennt hat Nach 9 Minuten erhält der Hörer immer noch Nachrichten. Wenn der Verbraucher stillschweigend die Verbindung wiederherstellt, würden wir erwarten, dass 9 Minuten nach der letzten Nachricht eine weitere Trennungsmeldung angezeigt wird, was jedoch nicht der Fall ist. Dieses Verhalten erscheint nicht logisch.
Unsere Konfiguration sieht so aus:

Code: Select all

  kafka:
ssl:
key-password: xxx
keystore-location: ${KEYSTORE_LOCATION:xxx.keystore.jks}
keystore-password: xxx
truststore-location: ${TRUSTORE_LOCATION:file:xxx.truststore.jks}
truststore-password: xxx
consumer:
bootstrap-servers: ${KAFKA_ENDPOINT:xxx.com:9092}
group-id: ${KAFKA_GROUP:xxx}
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
properties:
session.timeout.ms: ${KAFKA_SESSION_TIMEOUT:10000}
security.protocol: SSL
security.inter.broker.protocol: SSL
sasl.mechanism: https
Spring Boot 2.7. Java 8
Mir ist aufgefallen, dass wir beim Starten der App Folgendes in den Protokollen erhalten:

Code: Select all

2025-01-13T11:49:59,842Z INFO  restartedMain kafka.clients.consumer.ConsumerConfig [] => ConsumerConfig values:
allow.auto.create.topics = true
auto.commit.interval.ms = 5000
auto.offset.reset = latest
bootstrap.servers = [xxx.com:9092]
check.crcs = true
client.dns.lookup = use_all_dns_ips
client.id = consumer-xxx-1
client.rack =
connections.max.idle.ms = 540000
default.api.timeout.ms = 60000
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = xxx
group.instance.id = null
heartbeat.interval.ms = 3000
interceptor.classes = []
internal.leave.group.on.close = true
internal.throw.on.fetch.stable.offset.unsupported = false
isolation.level = read_uncommitted
key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 300000
max.poll.records = 500
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor, class org.apache.kafka.clients.consumer.CooperativeStickyAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.connect.timeout.ms = null
sasl.login.read.timeout.ms = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.login.retry.backoff.max.ms = 10000
sasl.login.retry.backoff.ms = 100
sasl.mechanism = https
sasl.oauthbearer.clock.skew.seconds = 30
sasl.oauthbearer.expected.audience = null
sasl.oauthbearer.expected.issuer = null
sasl.oauthbearer.jwks.endpoint.refresh.ms = 3600000
sasl.oauthbearer.jwks.endpoint.retry.backoff.max.ms = 10000
sasl.oauthbearer.jwks.endpoint.retry.backoff.ms = 100
sasl.oauthbearer.jwks.endpoint.url = null
sasl.oauthbearer.scope.claim.name = scope
sasl.oauthbearer.sub.claim.name = sub
sasl.oauthbearer.token.endpoint.url = null
security.protocol = SSL
security.providers = null
send.buffer.bytes = 131072
session.timeout.ms = 10000
socket.connection.setup.timeout.max.ms = 30000
socket.connection.setup.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2]
ssl.endpoint.identification.algorithm = https
ssl.engine.factory.class = null
ssl.key.password = [hidden]
ssl.keymanager.algorithm = SunX509
ssl.keystore.certificate.chain = null
ssl.keystore.key = null
ssl.keystore.location = xxxkeystore.jks
ssl.keystore.password = [hidden]
ssl.keystore.type = JKS
ssl.protocol = TLSv1.2
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.certificates = null
ssl.truststore.location = xxxtruststore.jks
ssl.truststore.password = [hidden]
ssl.truststore.type = JKS
value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer

Quick Reply

Change Text Case: 
   
  • Similar Topics
    Replies
    Views
    Last post