Spring-Integration: Aggregatormuster – eine Möglichkeit für nachgeschaltete Komponenten, festzustellen, ob die Gruppe voJava

Java-Forum
Guest
 Spring-Integration: Aggregatormuster – eine Möglichkeit für nachgeschaltete Komponenten, festzustellen, ob die Gruppe vo

Post by Guest »

Wir verwenden das Aggregatormuster Spring Integration 6.3.3 mit JDBCMessageStore und Ablauf-Timeout. Wir möchten die Teilgruppe auch bei Ablauf senden, was wir als Fehlerbedingung betrachten. Wir möchten protokollieren/warnen, wenn dieser Fehler auftritt.
Wie kann die Downstream-Komponente im IntegrationFlow am besten wissen, ob die aggregierten Nachrichten empfangen werden? Sind Nachrichten eine vollständige Gruppe oder ein Teil davon?
Hier ist ein vereinfachtes Beispiel:

Code: Select all

    @Autowired
PetGroupHandler petGroupHandler;

@Bean
public JdbcMessageStore jdbcMessageStore(
@Qualifier("mydbDataSource") DataSource dataSource) {
JdbcMessageStore messageStore = new JdbcMessageStore(dataSource);
messageStore.setRegion("petstore-pubsub");
return messageStore;
}

@Bean
IntegrationFlow petStoreSubscriptionFlow(JdbcMessageStore jdbcMessageStore, PetOutputProcessor petOutputProcessor) {
return IntegrationFlow.from("petStoreSubscriptionMessageChannel")
.filter(petOfInterestFilter, "shouldProcess")
.aggregate(aggregatorSpec -> aggregatorSpec
.messageStore(jdbcMessageStore)
.outputProcessor(petOutputProcessor)
.expireGroupsUponCompletion(true)
.groupTimeout(300 * 1000) // 5 minutes
.sendPartialResultOnExpiry(true) // send partial group
.correlationStrategy(message -> ((Pet) message.getPayload()).getBreed())
.releaseStrategy(group -> group.size()>=2))
.handle(petGroupHandler, "handle")
.get();
Wenn der PetGroupHandler im obigen Beispiel eine Liste von Haustieren empfängt, gibt es eine Möglichkeit herauszufinden, ob die Liste von Haustieren vollständig ist? Gruppe oder Teil, sodass der Handler für eine vollständige Gruppe etwas anderes tun kann als für eine Teilgruppe?
Wir haben versucht, einen OutputProcessor zu implementieren, der einen booleschen Header einfügt, der angibt, ob die Gruppe vollständig ist. Dies funktioniert jedoch nicht, da es so aussieht, als ob die Gruppe in AbstractCorrelatingMessageHandler.java direkt nach dem Aufruf der Release-Strategie und vor dem Aufruf des OutputProcessor im CompleteGroup() Methode.

Code: Select all

            if (this.releaseStrategy.canRelease(messageGroup)) {
Collection

Quick Reply

Change Text Case: 
   
  • Similar Topics
    Replies
    Views
    Last post