Hier ist die Healthcheck-Klasse, die ich implementiert habe:
Code: Select all
@Component
public class KafkaHealthCheck implements HealthIndicator {
private static final Logger logger = LoggerFactory.getLogger(KafkaHealthCheck.class);
private KafkaAdmin kafkaAdmin;
private Map kafkaConfig;
@Value("${application.topic}")
private String topicName;
@Value(value = "${kafka.bootstrapAddress}")
private String bootstrapAddress;
public KafkaHealthCheck(KafkaAdmin kafkaAdmin) {
this.kafkaAdmin = kafkaAdmin;
}
@PostConstruct
public void setUpAdminClient() {
kafkaConfig = new HashMap();
kafkaConfig.putAll(kafkaAdmin.getConfig());
kafkaConfig.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
}
@Override
public Health health() {
Long start = System.currentTimeMillis();
try (AdminClient adminClient = AdminClient.create(kafkaConfig)) {
DescribeClusterOptions describeClusterOptions = new DescribeClusterOptions().timeoutMs(2000);
adminClient.describeCluster(describeClusterOptions);
adminClient.describeConsumerGroups(List.of("topic")).all()
.get(2, TimeUnit.SECONDS);
Map topicDescriptionMap = adminClient
.describeTopics(List.of(topicName)).all().get(2, TimeUnit.SECONDS);
List partitions = topicDescriptionMap.get(topicName)
.partitions();
if (partitions == null || partitions.isEmpty()) {
logger.warn(String
.format("Kafka healthcheck failed - No partition found for topic: %s", topicName));
return Health.down()
.withDetail("Kafka healthcheck failed", "No partition found for topic: " + topicName)
.build();
} else {
if (partitions.stream().anyMatch(p -> p.leader() == null)) {
logger.warn(
String.format("Kafka healthcheck failed - No partition leader found for topic: %s",
topicName));
return Health.down().withDetail("Kafka healthcheck failed",
"No partition leader found for topic: " + topicName).build();
}
}
} catch (Exception e) {
logger.warn("Kafka healthcheck failed", e);
return Health.down()
.withDetail("Kafka healthcheck failed", "Exception occurred during healthcheck").build();
}
System.out.println(System.currentTimeMillis() - start);
return Health.up().build();
}
}
1 – Die KafkaAdmin wird in dieser Klasse mit der gesamten Konfiguration eingefügt, die ich habe (ich verwende SSL), mit Ausnahme der „bootstrap.servers“. Ich habe herausgefunden, dass org.springframework.boot.autoconfigure.kafka.KafkaProperties localhost:9092 als Standard hat, was irgendwie nicht von der Anwendungskonfiguration überschrieben wurde, obwohl es für Verbraucher und Produzenten gut funktioniert. Ich habe keine Ahnung, warum das so ist, und deshalb muss ich es hier manuell einrichten.
2 – Ich habe Zeitüberschreitungen zu DescribeClusterOptions und beschreibenConsumerGroups hinzugefügt aber diese Zeitüberschreitungen scheinen völlig ignoriert zu werden. Wenn ich den Broker manuell herunterfahre, dauert es etwa ein paar Minuten, bis die Gesundheitsprüfung einen Fehler meldet.
3 – Aufgrund des Bootstrap.servers-Fehlers, als ich den tatsächlich bereitgestellt habe In der Anwendung wurde mein Protokollserver fast zerstört, da Millionen von Protokollzeilen von org.apache.kafka.clients.NetworkClient generiert wurden und besagten, dass die Verbindung zu Knoten -1 nicht hergestellt werden konnte. Der Broker ist möglicherweise nicht verfügbar.. Wie kann ich verhindern, dass es tatsächlich noch einmal passiert? Auch in Fällen, in denen der Broker während des Betriebs ausfällt.
4 – Selbst die erfolgreiche Gesundheitsprüfung generiert viele Protokollzeilen, wenn ich AdminClient erstelle. Es meldet die gesamte gelesene Konfiguration und eine Reihe anderer Anweisungen ab. Gibt es eine Chance, es zu minimieren?
5 – Insgesamt ist dies sehr langsam. Ich habe versucht, die Zeit zu berechnen, die nur für die Durchführung dieses Gesundheitschecks benötigt wird, und sie liegt im Durchschnitt bei etwa 1,5 Sekunden. Gibt es eine Chance, es zu optimieren?