Code: Select all
@PostMapping("/question")
public String question(@RequestBody List messages) {
try (ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor()) {
for (String message : messages) {
executor.submit(() -> {
kafkaTemplate.send("topic-loom-micrometer", message + Thread.currentThread().getName());
});
}
executor.shutdown();
}
Das Ziel besteht darin, alle Nachrichten mithilfe virtueller Threads parallel an Kafka zu senden.
Seitdem Da es viele, viele Nachrichten gibt, nutzt der Code hier virtuelle Threads, um die Parallelität zu erhöhen, da eine herkömmliche for-Schleife Nachrichten eine nach der anderen senden würde, was viel länger dauert.
Das Ziel besteht darin, die zu beobachten Spuren, trotz Verwendung virtueller Threads.
Das Dokument von Micrometer ist https://docs.micrometer.io/micrometer/r ... components
Es gibt Herausforderungen bei der Anpassung an mein Stück Code:
1 – Im Dokument wird Executors.newCachedThreadPool() erwähnt, ich bin mir nicht sicher, ob es für virtuelle Threads funktionieren würde.
2 – Die Verwendung von then(registry.getCurrentObservation()).isSameAs(parent);, nicht sicher, woher es kommt.
3 – Die Die Verwendung von ContextExecutorService.wrap(executor) scheint veraltet zu sein. Aber das Dokument sagt nicht, was dafür verwendet werden soll.
4 - Ich habe den Code wie folgt angepasst:
Code: Select all
@PostMapping("/question2")
public String question2(@RequestBody List messages) {
try (ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor()) {
Observation parent = Observation.createNotStarted("parent", registry);
for (String message : messages) {
parent.observe(() -> {
ContextExecutorService.wrap(executor).submit(() -> {
return Observation.createNotStarted("child", registry)
.observe(() -> {
kafkaTemplate.send("topic-loom-micrometer", message + Thread.currentThread().getName());
});
}
);
});
}
executor.shutdown();
}
A – Ist die Anpassung korrekt, da ich eine for-Schleife verwende?
B – Es scheint, dass ein Callable erforderlich ist. Wie lässt sich der Code kompilieren?