Jetzt versuche ich, auf das neue funktionale Programmiermodell von Spring Cloud Stream zu migrieren, und aus der Dokumentation bin ich zu dem Schluss gekommen, dass StreamBridge mit externen Quelldaten der für meinen Fall erforderliche Ansatz ist (https://docs.spring.io/spring-cloud-str ... en_sources). Allerdings habe ich nicht verstanden, wie Quelle, Bindungsname und Zielthema im Hinblick auf die Namenskonvention richtig konfiguriert werden sollten, wenn keine Quellfunktion definiert ist. Ich habe die folgende Konfiguration, die erfolgreich Nachrichten auf „myFooTopic“ erzeugt, aber beim Start der Anwendung sind mir einige seltsame Protokolle aufgefallen, da die Bindung anscheinend nicht richtig durchgeführt wurde:
application.properties:
Code: Select all
spring.kafka.bootstrap-servers=listOfServers for kafka
spring.kafka.producer.value-serializer= MyCustomKafkaPayloadAvroSerializer
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.properties.schema.registry.url=listOfServers for schema registry
spring.cloud.stream.bindings.user.destination=myUserTopic
spring.cloud.stream.bindings.user.producer.useNativeEncoding=true
spring.cloud.stream.bindings.user.producer.partitionCount=1
spring.cloud.stream.bindings.user.producer.partitionKeyExpression=headers['partitionId']
spring.cloud.stream.kafka.binder.autoCreateTopics=false
spring.cloud.stream.kafka.binder.configuration.security.protocol=SSL
spring.cloud.stream.kafka.binder.configuration.ssl.truststore.location=
spring.cloud.stream.kafka.binder.configuration.ssl.truststore.type=
spring.cloud.stream.kafka.binder.configuration.ssl.keystore.location=
spring.cloud.stream.kafka.binder.configuration.ssl.keystore.type=
Code: Select all
@Component
@RequiredArgsConstructor
@Slf4j
public class EventPublisher {
private final StreamBridge streamBridge;
private static final String USER = "user";
public void sendToChannel(String message) {
log.info("sendToChannel - sending to channel hello event");
try {
if (streamBridge.send(USER, buildChannelMessage(message))) {
log.info("sendToChannel - message was successfully sent ");
} else {
log.error("sendToChannel - failed to send message");
}
} catch (Exception e) {
log.error("sendToChannel - error while sending message on output binding {}", USER, e);
}
}
private Message buildChannelMessage(String message) {
HelloEventAvro helloEventAvro = HelloEventAvro.newBuilder()
.setHelloMessage(message)
.build();
long timestamp = Instant.now().toEpochMilli();
return MessageBuilder.withPayload(helloEventAvro)
.setHeader("partitionId", 1)
.setHeader("X-Timestamp", timestamp)
.build();
}
}
- spring-boot 2.6.2
- kafka-clients 2.8.1
- spring-cloud-stream 3.2.1
- spring-cloud-stream-binder-kafka 3.2.1
- spring-integration-kafka 5.5.8
- Ist die Bindung des Produzenten an myUserTopic korrekt oder muss ich die Eigenschaft spring.cloud.stream.source=user hinzufügen?
- "user" bindingName ist korrekt oder es sollte die Konvention als „user-out-0“ respektieren, wenn man bedenkt, dass ich Ist kein Bean-Lieferant konfiguriert?
- Wenn die erste Nachricht nach dem Start der Anwendung erzeugt wird, kann ich die folgenden Protokolle sehen:
Code: Select all
Using kafka topic for outbound: myUserTopic (which is correct)
Caching the binder: kafka
Retrieving cached binder: kafka
.....
Channel 'unknown.channel.name' has 1 subscriber(s). (which is strange)
Ich verstehe nicht, warum der Name des Kanals „unknown“ anstelle des in der application.properties-Konfiguration bereitgestellten Ausgabebindungsnamens „user“ lautet. Können Sie mir helfen, herauszufinden, ob auf meiner Seite eine Fehlkonfiguration vorliegt? Alle Spring-Cloud-Stream-Beispiele aus der Dokumentation und Github verwenden StreamBridge entweder mit dynamischen Zielen oder SupplierConfiguration.
Bearbeiten:
Ich habe eine weitere Frage zum Testen der oben genannten Konfiguration. Ich habe versucht, einen Test für diesen speziellen Anwendungsfall von StreamBridge zu schreiben, indem ich den Beispielen von hier gefolgt bin ([https://github.com/spring-cloud/spring- ... .java#L716][1]) und passen Sie es für meine obige application.properties-Konfiguration und an Aus irgendeinem Grund ist die in der outputDestination empfangene Nachricht für diesen Test null:
Code: Select all
public class StreamBridgeTests {
@Test
public void testSendingMessageToDestination() {
try (ConfigurableApplicationContext context = new SpringApplicationBuilder(TestChannelBinderConfiguration
.getCompleteConfiguration(Application.class))
.web(WebApplicationType.NONE).run()) {
HelloEventAvro helloEventAvro = buildHelloEventAvro();
Message helloEventAvroMessage = MessageBuilder
.withPayload(helloEventAvro)
.setHeader(CustomKafkaHeaders.PARTITION_ID.value(), helloEventAvro.getId())
.build();
StreamBridge bridge = context.getBean(StreamBridge.class);
bridge.send("user", helloEventAvroMessage);
OutputDestination outputDestination = context.getBean(OutputDestination.class);
Message message = outputDestination.receive(100, "user");
assertThat(new String(message.getPayload())).contains("hello");
}
}
}
- user.destination mit Größe 0 (wobei "user" = mein Bindungsname)
- myUserTopic.destination mit Größe 1 und die erzeugte helloEventAvroMessage
Code: Select all
public class StreamBridgeTests {
@Test
public void testSendingMessageToDestination() {
try (ConfigurableApplicationContext context = new SpringApplicationBuilder(TestChannelBinderConfiguration
.getCompleteConfiguration(Application.class))
.web(WebApplicationType.NONE).run()) {
HelloEventAvro helloEventAvro = buildHelloEventAvro();
Message helloEventAvroMessage = MessageBuilder
.withPayload(helloEventAvro)
.setHeader(CustomKafkaHeaders.PARTITION_ID.value(), helloEventAvro.getId())
.build();
StreamBridge bridge = context.getBean(StreamBridge.class);
bridge.send("user", helloEventAvroMessage);
OutputDestination outputDestination = context.getBean(OutputDestination.class);
Message message = outputDestination.receive(100, "myUserTopic");
assertThat(new String(message.getPayload())).contains("hello");
}
}
}
[1]: https://github.com/spring-cloud/spring- ... .java#L716)
Mobile version