Wie kann die Downstream-Komponente im IntegrationFlow am besten wissen, ob die aggregierten Nachrichten empfangen werden? Sind Nachrichten eine vollständige Gruppe oder ein Teil davon?
Hier ist ein vereinfachtes Beispiel:
Code: Select all
@Autowired
PetGroupHandler petGroupHandler;
@Bean
public JdbcMessageStore jdbcMessageStore(
@Qualifier("mydbDataSource") DataSource dataSource) {
JdbcMessageStore messageStore = new JdbcMessageStore(dataSource);
messageStore.setRegion("petstore-pubsub");
return messageStore;
}
@Bean
IntegrationFlow petStoreSubscriptionFlow(JdbcMessageStore jdbcMessageStore, PetOutputProcessor petOutputProcessor) {
return IntegrationFlow.from("petStoreSubscriptionMessageChannel")
.filter(petOfInterestFilter, "shouldProcess")
.aggregate(aggregatorSpec -> aggregatorSpec
.messageStore(jdbcMessageStore)
.outputProcessor(petOutputProcessor)
.expireGroupsUponCompletion(true)
.groupTimeout(300 * 1000) // 5 minutes
.sendPartialResultOnExpiry(true) // send partial group
.correlationStrategy(message -> ((Pet) message.getPayload()).getBreed())
.releaseStrategy(group -> group.size()>=2))
.handle(petGroupHandler, "handle")
.get();
Wir haben versucht, einen OutputProcessor zu implementieren, der einen booleschen Header einfügt, der angibt, ob die Gruppe vollständig ist. Dies funktioniert jedoch nicht, da es so aussieht, als ob die Gruppe in AbstractCorrelatingMessageHandler.java direkt nach dem Aufruf der Release-Strategie und vor dem Aufruf des OutputProcessor im CompleteGroup() Methode.
Code: Select all
if (this.releaseStrategy.canRelease(messageGroup)) {
Collection