Was ist die sauberere und effizientere Möglichkeit, den Gesundheitscheck für Kafka in meiner Spring-Boot-Anwendung zu imJava

Java-Forum
Guest
 Was ist die sauberere und effizientere Möglichkeit, den Gesundheitscheck für Kafka in meiner Spring-Boot-Anwendung zu im

Post by Guest »

Ich habe eine Spring-Boot-Anwendung (2.1.6), die Nachrichten sowohl konsumiert als auch an eine (organisationsweite) gemeinsame Kafka-Instanz sendet. Ich versuche, Integritätsprüfungen für diesen Kafka-Broker mithilfe eines Federaktuators in meiner Anwendung zu implementieren, und stehe vor einer Reihe von Problemen im Zusammenhang mit Leistung und Protokollierung. In Spring Boot 2.0 war ein Gesundheitsindikator integriert, der jedoch aufgrund offensichtlicher Probleme entfernt wurde.

Hier ist die Healthcheck-Klasse, die ich implementiert habe:

Code: Select all

@Component
public class KafkaHealthCheck implements HealthIndicator {

private static final Logger logger = LoggerFactory.getLogger(KafkaHealthCheck.class);

private KafkaAdmin kafkaAdmin;

private Map kafkaConfig;

@Value("${application.topic}")
private String topicName;

@Value(value = "${kafka.bootstrapAddress}")
private String bootstrapAddress;

public KafkaHealthCheck(KafkaAdmin kafkaAdmin) {
this.kafkaAdmin = kafkaAdmin;

}

@PostConstruct
public void setUpAdminClient() {
kafkaConfig = new HashMap();
kafkaConfig.putAll(kafkaAdmin.getConfig());
kafkaConfig.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
}

@Override
public Health health() {
Long start = System.currentTimeMillis();
try (AdminClient adminClient = AdminClient.create(kafkaConfig)) {

DescribeClusterOptions describeClusterOptions = new DescribeClusterOptions().timeoutMs(2000);
adminClient.describeCluster(describeClusterOptions);

adminClient.describeConsumerGroups(List.of("topic")).all()
.get(2, TimeUnit.SECONDS);

Map topicDescriptionMap = adminClient
.describeTopics(List.of(topicName)).all().get(2, TimeUnit.SECONDS);

List partitions = topicDescriptionMap.get(topicName)
.partitions();

if (partitions == null || partitions.isEmpty()) {
logger.warn(String
.format("Kafka healthcheck failed - No partition found for topic: %s", topicName));
return Health.down()
.withDetail("Kafka healthcheck failed", "No partition found for topic: " + topicName)
.build();
} else {
if (partitions.stream().anyMatch(p -> p.leader() == null)) {
logger.warn(
String.format("Kafka healthcheck failed - No partition leader found for topic: %s",
topicName));
return Health.down().withDetail("Kafka healthcheck failed",
"No partition leader found for topic: " + topicName).build();
}
}
} catch (Exception e) {
logger.warn("Kafka healthcheck failed", e);
return Health.down()
.withDetail("Kafka healthcheck failed", "Exception occurred during healthcheck").build();
}
System.out.println(System.currentTimeMillis() - start);
return Health.up().build();
}
}
Das sind nun die Fragen, die ich habe, oder die Probleme, mit denen ich bei dieser Implementierung konfrontiert bin:

1 – Die KafkaAdmin wird in dieser Klasse mit der gesamten Konfiguration eingefügt, die ich habe (ich verwende SSL), mit Ausnahme der „bootstrap.servers“. Ich habe herausgefunden, dass org.springframework.boot.autoconfigure.kafka.KafkaProperties localhost:9092 als Standard hat, was irgendwie nicht von der Anwendungskonfiguration überschrieben wurde, obwohl es für Verbraucher und Produzenten gut funktioniert. Ich habe keine Ahnung, warum das so ist, und deshalb muss ich es hier manuell einrichten.

2 – Ich habe Zeitüberschreitungen zu DescribeClusterOptions und beschreibenConsumerGroups hinzugefügt aber diese Zeitüberschreitungen scheinen völlig ignoriert zu werden. Wenn ich den Broker manuell herunterfahre, dauert es etwa ein paar Minuten, bis die Gesundheitsprüfung einen Fehler meldet.

3 – Aufgrund des Bootstrap.servers-Fehlers, als ich den tatsächlich bereitgestellt habe In der Anwendung wurde mein Protokollserver fast zerstört, da Millionen von Protokollzeilen von org.apache.kafka.clients.NetworkClient generiert wurden und besagten, dass die Verbindung zu Knoten -1 nicht hergestellt werden konnte. Der Broker ist möglicherweise nicht verfügbar.. Wie kann ich verhindern, dass es tatsächlich noch einmal passiert? Auch in Fällen, in denen der Broker während des Betriebs ausfällt.

4 – Selbst die erfolgreiche Gesundheitsprüfung generiert viele Protokollzeilen, wenn ich AdminClient erstelle. Es meldet die gesamte gelesene Konfiguration und eine Reihe anderer Anweisungen ab. Gibt es eine Chance, es zu minimieren?

5 – Insgesamt ist dies sehr langsam. Ich habe versucht, die Zeit zu berechnen, die nur für die Durchführung dieses Gesundheitschecks benötigt wird, und sie liegt im Durchschnitt bei etwa 1,5 Sekunden. Gibt es eine Chance, es zu optimieren?

Quick Reply

Change Text Case: 
   
  • Similar Topics
    Replies
    Views
    Last post