Parallel Flux Blocking CallJava

Java-Forum
Anonymous
 Parallel Flux Blocking Call

Post by Anonymous »

Mein Anwendungsaufbau wird als Teil der korrekten Methode für die korrekte Art der Spring Webclient in Spring AMQP
erwähnt, in der ich versuche, den Spring Webclient zu verwenden, um API -Anrufe im Frühjahr AMQP Rabbit MQ -Verbraucher -Threads zu tätigen. Das folgende minimalistische Setup -< /p>
Verwendete Abhängigkeiten < /p>

Code: Select all

Spring boot 2.2.6.RELEASE
spring-boot-starter-web
spring-boot-starter-webflux
reactor-netty 0.9.14.RELEASE
< /code>
Wie in der anderen verknüpften Ausgabe erwähnt, finden Sie unten die Konfiguration für WebClient -< /p>
@Bean
public WebClient webClient() {
ConnectionProvider connectionProvider =
ConnectionProvider.builder("fixed")
.lifo()
.pendingAcquireTimeout(Duration.ofMillis(200000))
.maxConnections(100)
.pendingAcquireMaxCount(3000)
.maxIdleTime(Duration.ofMillis(290000))
.build();

HttpClient client = HttpClient.create(connectionProvider);
client.tcpConfiguration(
>);

Webclient.Builder builder =
Webclient.builder()
.baseUrl(>)
.clientConnector(new ReactorClientHttpConnector(client));

return builder.build();
}
< /code>
unten ist @Service -Klasse mit parallelen Flux -Webclient -Aufrufen -< /p>
@Service
public class FluxtestService {
public Flux getFlux(List reqList) {
return Flux.fromIterable(reqList)
.parallel()
.runOn(Schedulers.elastic())
.flatMap(s
-> {return webClient.method(POST)
.uri(>)
.body(BodyInserters.fromValue(s))
.exchange()
.flatMap(response -> {
if (response.statusCode().isError()) {
return Mono.just(new Response());
}
return response.bodyToMono(Response.class);
})})
.sequential();
}
}
}
Um den Spring AMQP Rabbit MQ Consumer/Listener zu simulieren, habe ich das unten stehende @restController -
erstellt.@RestController public class FluxTestController

@Autowired private FluxtestService service;

@PostMapping("/fluxtest")
public List getFlux(List reqlist) {
return service.getFlux(reqlist).collectList().block();
}
< /code>
Ich habe versucht, Anforderungen von JMeter mit rund 15 Threads zu entlassen. Die ersten Anfragen werden sehr schnell verarbeitet. Während Anfragen erteilt werden, kann ich den folgenden Satz von Protokollen in der Protokolldatei sehen-< /p>
Channel cleaned, now 32 active connections and 68 inactive connections
< /code>
Sobald ich weitere Anfragensätze eingereicht habe, steigen die aktiven Verbindungen immer wieder, bis sie das von 100 konfigurierte max. Bis zu diesem Zeitpunkt ist die Antwortzeit in Ordnung. < /P>
, aber alle nachfolgenden Anforderungen dauern sehr lange. Außerdem sehe ich nicht, dass die aktiven Verbindungen viel reduzieren, obwohl keine Anfragen abgefeuert werden.reactor.netty.internal.shaded.reactor.pool.PoolAcquireTimeoutException: Pool#acquire(Duration) has been pending for more than the configured timeout of 200000 ms
< /code>
Dies zeigt wahrscheinlich, dass die nachgeschaltete Verbindung nicht veröffentlicht wird. Bitte helfen Sie bei diesem Problem und möglichen Korrekturen.

Quick Reply

Change Text Case: 
   
  • Similar Topics
    Replies
    Views
    Last post