Gibt es eine Möglichkeit, aus einem Upstream-Fluss zwei Downstream-Flüsse zu erstellen, ohne CollectList() aufzurufen?
Posted: 20 Dec 2024, 16:15
Ich habe ein Upstream-Flussmittel mit Flux und möchte zwei unabhängige Downstream-Flussmittel erstellen, wobei Filter/FilterWhen on Flux 1 sich nicht auf Flux 2 auswirken würde. Das Downstream-Flussmittel würde wie folgt aussehen
Flux itemFlux = Flux.fromIterable(someItemList)
.transform(upstreamFlux ->
Tuples.of(
// create Flux 1
Flux.from(upstreamFlux)
.filter(someFilter...)
.filterWhen(anotherFilter...)
// create Flux 2
Flux.from(upstreamFlux),
)
)
Es ist derzeit möglich, meine Ergebnisse abzurufen, aber ich müsste CollectList() aufrufen und Flux 1 warten lassen, bis alle Elemente vom Upstream gesammelt wurden.
Flux itemFlux = Flux.fromIterable(someItemList)
.collectList()
.flatMap(collectedItemList ->
Tuples.of(
Flux.from(collectedItemList )
.filter(someFilter...)
.filterWhen(anotherFilter...)
.collectList()
Flux.from(collectedItemList).collectList()
)
)
Um weiteren Kontext bereitzustellen: Der Upstream-Fluss hat bereits umfangreiche Berechnungen durchlaufen und viele HTTP-Anfragen gesendet, sodass ich den Upstream-Fluss im Idealfall nicht neu erstellen möchte. Ich habe mir Flux.publish() und Flux.cache() angesehen, weiß aber nicht genau, wie ich sie für dieses spezielle Szenario nutzen kann.
- Flux 1
- Für jedes Element gäbe es filter und filterWhen auf jedes Element anwenden, um es aus dem Flux-Stream herauszufiltern, und dies sollte nicht der Fall sein beeinflussen Flux 2
- Flux 2
- Die endgültige Ausgabe davon wäre äquivalent zu Flux.collectList()
Flux itemFlux = Flux.fromIterable(someItemList)
.transform(upstreamFlux ->
Tuples.of(
// create Flux 1
Flux.from(upstreamFlux)
.filter(someFilter...)
.filterWhen(anotherFilter...)
// create Flux 2
Flux.from(upstreamFlux),
)
)
Es ist derzeit möglich, meine Ergebnisse abzurufen, aber ich müsste CollectList() aufrufen und Flux 1 warten lassen, bis alle Elemente vom Upstream gesammelt wurden.
Flux itemFlux = Flux.fromIterable(someItemList)
.collectList()
.flatMap(collectedItemList ->
Tuples.of(
Flux.from(collectedItemList )
.filter(someFilter...)
.filterWhen(anotherFilter...)
.collectList()
Flux.from(collectedItemList).collectList()
)
)
Um weiteren Kontext bereitzustellen: Der Upstream-Fluss hat bereits umfangreiche Berechnungen durchlaufen und viele HTTP-Anfragen gesendet, sodass ich den Upstream-Fluss im Idealfall nicht neu erstellen möchte. Ich habe mir Flux.publish() und Flux.cache() angesehen, weiß aber nicht genau, wie ich sie für dieses spezielle Szenario nutzen kann.