Spring-Integration: Aggregatormuster – eine Möglichkeit für nachgeschaltete Komponenten, festzustellen, ob die Gruppe vo
Posted: 13 Jan 2025, 19:24
Wir verwenden das Aggregatormuster Spring Integration 6.3.3 mit JDBCMessageStore und Ablauf-Timeout. Wir möchten die Teilgruppe auch bei Ablauf senden, was wir als Fehlerbedingung betrachten. Wir möchten protokollieren/warnen, wenn dieser Fehler auftritt.
Wie kann die Downstream-Komponente im IntegrationFlow am besten wissen, ob die aggregierten Nachrichten empfangen werden? Sind Nachrichten eine vollständige Gruppe oder ein Teil davon?
Hier ist ein vereinfachtes Beispiel:
Wenn der PetGroupHandler im obigen Beispiel eine Liste von Haustieren empfängt, gibt es eine Möglichkeit herauszufinden, ob die Liste von Haustieren vollständig ist? Gruppe oder Teil, sodass der Handler für eine vollständige Gruppe etwas anderes tun kann als für eine Teilgruppe?
Wir haben versucht, einen OutputProcessor zu implementieren, der einen booleschen Header einfügt, der angibt, ob die Gruppe vollständig ist. Dies funktioniert jedoch nicht, da es so aussieht, als ob die Gruppe in AbstractCorrelatingMessageHandler.java direkt nach dem Aufruf der Release-Strategie und vor dem Aufruf des OutputProcessor im CompleteGroup() Methode.
Wie kann die Downstream-Komponente im IntegrationFlow am besten wissen, ob die aggregierten Nachrichten empfangen werden? Sind Nachrichten eine vollständige Gruppe oder ein Teil davon?
Hier ist ein vereinfachtes Beispiel:
Code: Select all
@Autowired
PetGroupHandler petGroupHandler;
@Bean
public JdbcMessageStore jdbcMessageStore(
@Qualifier("mydbDataSource") DataSource dataSource) {
JdbcMessageStore messageStore = new JdbcMessageStore(dataSource);
messageStore.setRegion("petstore-pubsub");
return messageStore;
}
@Bean
IntegrationFlow petStoreSubscriptionFlow(JdbcMessageStore jdbcMessageStore, PetOutputProcessor petOutputProcessor) {
return IntegrationFlow.from("petStoreSubscriptionMessageChannel")
.filter(petOfInterestFilter, "shouldProcess")
.aggregate(aggregatorSpec -> aggregatorSpec
.messageStore(jdbcMessageStore)
.outputProcessor(petOutputProcessor)
.expireGroupsUponCompletion(true)
.groupTimeout(300 * 1000) // 5 minutes
.sendPartialResultOnExpiry(true) // send partial group
.correlationStrategy(message -> ((Pet) message.getPayload()).getBreed())
.releaseStrategy(group -> group.size()>=2))
.handle(petGroupHandler, "handle")
.get();
Wir haben versucht, einen OutputProcessor zu implementieren, der einen booleschen Header einfügt, der angibt, ob die Gruppe vollständig ist. Dies funktioniert jedoch nicht, da es so aussieht, als ob die Gruppe in AbstractCorrelatingMessageHandler.java direkt nach dem Aufruf der Release-Strategie und vor dem Aufruf des OutputProcessor im CompleteGroup() Methode.
Code: Select all
if (this.releaseStrategy.canRelease(messageGroup)) {
Collection