StreamBridge-Bindung anstelle von EnableBinding- und Output-Anmerkungen, die seit Version 3.1 von Spring Cloud Stream veJava

Java-Forum
Anonymous
 StreamBridge-Bindung anstelle von EnableBinding- und Output-Anmerkungen, die seit Version 3.1 von Spring Cloud Stream ve

Post by Anonymous »

Ich habe mehrere APIs, die über Kafka miteinander kommunizieren (Nachrichten produzieren und konsumieren). In einer der APIs erzeuge ich Nachrichten basierend auf einem HTTP-Anforderungsauslöser (wenn ein Endpunkt aufgerufen wird, wird eine Nachricht erstellt und an Kafka gesendet) mit den Annotationen @Output und @EnableBinding. Diese Nachrichten werden von anderen APIs verwendet, die dieses Thema abonnieren.
Jetzt versuche ich, auf das neue funktionale Programmiermodell von Spring Cloud Stream zu migrieren, und aus der Dokumentation bin ich zu dem Schluss gekommen, dass StreamBridge mit externen Quelldaten der für meinen Fall erforderliche Ansatz ist (https://docs.spring.io/spring-cloud-str ... en_sources). Allerdings habe ich nicht verstanden, wie Quelle, Bindungsname und Zielthema im Hinblick auf die Namenskonvention richtig konfiguriert werden sollten, wenn keine Quellfunktion definiert ist. Ich habe die folgende Konfiguration, die erfolgreich Nachrichten auf „myFooTopic“ erzeugt, aber beim Start der Anwendung sind mir einige seltsame Protokolle aufgefallen, da die Bindung anscheinend nicht richtig durchgeführt wurde:
application.properties:

Code: Select all

spring.kafka.bootstrap-servers=listOfServers for kafka
spring.kafka.producer.value-serializer= MyCustomKafkaPayloadAvroSerializer
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.properties.schema.registry.url=listOfServers for schema registry

spring.cloud.stream.bindings.user.destination=myUserTopic
spring.cloud.stream.bindings.user.producer.useNativeEncoding=true
spring.cloud.stream.bindings.user.producer.partitionCount=1

spring.cloud.stream.bindings.user.producer.partitionKeyExpression=headers['partitionId']

spring.cloud.stream.kafka.binder.autoCreateTopics=false
spring.cloud.stream.kafka.binder.configuration.security.protocol=SSL
spring.cloud.stream.kafka.binder.configuration.ssl.truststore.location=
spring.cloud.stream.kafka.binder.configuration.ssl.truststore.type=
spring.cloud.stream.kafka.binder.configuration.ssl.keystore.location=
spring.cloud.stream.kafka.binder.configuration.ssl.keystore.type=
Darüber hinaus lautet der Code für StreamBridge:

Code: Select all

@Component
@RequiredArgsConstructor
@Slf4j
public class EventPublisher {

private final StreamBridge streamBridge;
private static final String USER = "user";

public void sendToChannel(String message) {
log.info("sendToChannel - sending to channel hello event");

try {
if (streamBridge.send(USER, buildChannelMessage(message))) {
log.info("sendToChannel - message was successfully sent ");
} else {
log.error("sendToChannel - failed to send message");
}
} catch (Exception e) {
log.error("sendToChannel - error while sending message on output binding {}", USER, e);
}
}

private Message buildChannelMessage(String message) {
HelloEventAvro helloEventAvro = HelloEventAvro.newBuilder()
.setHelloMessage(message)
.build();
long timestamp = Instant.now().toEpochMilli();

return MessageBuilder.withPayload(helloEventAvro)
.setHeader("partitionId", 1)
.setHeader("X-Timestamp", timestamp)
.build();
}
}
und unter den verwendeten Abhängigkeiten:
  • spring-boot 2.6.2
  • kafka-clients 2.8.1
  • spring-cloud-stream 3.2.1
  • spring-cloud-stream-binder-kafka 3.2.1
  • spring-integration-kafka 5.5.8
Meine Fragen sind:
  • Ist die Bindung des Produzenten an myUserTopic korrekt oder muss ich die Eigenschaft spring.cloud.stream.source=user hinzufügen?
  • "user" bindingName ist korrekt oder es sollte die Konvention als „user-out-0“ respektieren, wenn man bedenkt, dass ich Ist kein Bean-Lieferant konfiguriert?
  • Wenn die erste Nachricht nach dem Start der Anwendung erzeugt wird, kann ich die folgenden Protokolle sehen:

Code: Select all

    Using kafka topic for outbound: myUserTopic (which is correct)
Caching the binder: kafka
Retrieving cached binder: kafka
.....
Channel 'unknown.channel.name' has 1 subscriber(s).  (which is strange)
Für die folgenden erzeugten Nachrichten wird das Protokoll „unknown.channel.name“ nicht erneut angezeigt.
Ich verstehe nicht, warum der Name des Kanals „unknown“ anstelle des in der application.properties-Konfiguration bereitgestellten Ausgabebindungsnamens „user“ lautet. Können Sie mir helfen, herauszufinden, ob auf meiner Seite eine Fehlkonfiguration vorliegt? Alle Spring-Cloud-Stream-Beispiele aus der Dokumentation und Github verwenden StreamBridge entweder mit dynamischen Zielen oder SupplierConfiguration.

Bearbeiten:
Ich habe eine weitere Frage zum Testen der oben genannten Konfiguration. Ich habe versucht, einen Test für diesen speziellen Anwendungsfall von StreamBridge zu schreiben, indem ich den Beispielen von hier gefolgt bin ([https://github.com/spring-cloud/spring- ... .java#L716][1]) und passen Sie es für meine obige application.properties-Konfiguration und an Aus irgendeinem Grund ist die in der outputDestination empfangene Nachricht für diesen Test null:

Code: Select all

public class StreamBridgeTests {

@Test
public void testSendingMessageToDestination() {
try (ConfigurableApplicationContext context = new SpringApplicationBuilder(TestChannelBinderConfiguration
.getCompleteConfiguration(Application.class))
.web(WebApplicationType.NONE).run()) {

HelloEventAvro helloEventAvro = buildHelloEventAvro();

Message helloEventAvroMessage = MessageBuilder
.withPayload(helloEventAvro)
.setHeader(CustomKafkaHeaders.PARTITION_ID.value(), helloEventAvro.getId())
.build();

StreamBridge bridge = context.getBean(StreamBridge.class);
bridge.send("user", helloEventAvroMessage);

OutputDestination outputDestination = context.getBean(OutputDestination.class);
Message message = outputDestination.receive(100, "user");
assertThat(new String(message.getPayload())).contains("hello");
}
}
}
Ich habe mich eingehender mit dem Debuggen befasst und es scheint, dass das Ausgabeziel mit einem Kanal (myUserTopic.destination) und 2 messageQueues anstelle von 1 erstellt wird, wie unten erwähnt:
  • user.destination mit Größe 0 (wobei "user" = mein Bindungsname)
  • myUserTopic.destination mit Größe 1 und die erzeugte helloEventAvroMessage
Wenn ich den bindingName von „user“ in den Namen des Kafka-Themas „myUserTopic“ in der Methode „receive()“ von OutputDestination ändere, funktioniert es wie erwartet: Ein Kanal wird erstellt (myUserTopic.destination) und 1 messageQueue (myUserTopic.destination):

Code: Select all

public class StreamBridgeTests {

@Test
public void testSendingMessageToDestination() {
try (ConfigurableApplicationContext context = new SpringApplicationBuilder(TestChannelBinderConfiguration
.getCompleteConfiguration(Application.class))
.web(WebApplicationType.NONE).run()) {

HelloEventAvro helloEventAvro = buildHelloEventAvro();

Message helloEventAvroMessage = MessageBuilder
.withPayload(helloEventAvro)
.setHeader(CustomKafkaHeaders.PARTITION_ID.value(), helloEventAvro.getId())
.build();

StreamBridge bridge = context.getBean(StreamBridge.class);
bridge.send("user", helloEventAvroMessage);

OutputDestination outputDestination = context.getBean(OutputDestination.class);
Message message = outputDestination.receive(100, "myUserTopic");
assertThat(new String(message.getPayload())).contains("hello");
}
}
}
In Anbetracht des oben Gesagten verstehe ich immer noch nicht, wie der bindingName aus der send()-Methode von StreamBridge in Korrelation mit der require()-Methode von OutputDestination funktioniert (sollte nicht an beiden Stellen derselbe Name sein, wenn ich nur ein Thema habe? ) und wie er in den Kafka-Topic-Zielnamensatz aufgelöst wird.
[1]: https://github.com/spring-cloud/spring- ... .java#L716)

Quick Reply

Change Text Case: 
   
  • Similar Topics
    Replies
    Views
    Last post