Rxjava vielleichtJava

Java-Forum
Anonymous
 Rxjava vielleicht

Post by Anonymous »

Ich habe ein verwirrendes Timing-Problem mit RXJAVA-Betreibern in einem High-Throughput-Dienst (40 TPS) mit gleichzeitiger Ausführung. Ich habe zwei Klassen, die Ausführungszeiten log, aber sie zeigen sehr unterschiedliche Zeiten für die gleiche Operation.

Code: Select all

Class A (the actual implementation):

Code: Select all

public Maybe execute(Input input) {
long start = System.nanoTime();

return Maybe.fromCallable(() -> {
// Some processing that takes ~500ms
return callExternalService();
})
.doOnSuccess(decision -> {
log.info("Implementation took {} ms",
((System.nanoTime() - start) / 1000000)); // Logs ~500ms
});
}

< /code>
Class B (a decorator that wraps Class A):

Code: Select all

public Maybe execute(Input input) {
return Maybe.defer(() -> {
long startTime = System.nanoTime();

return delegate.execute(input)  // Calls Class A
.doOnSuccess(decision -> {
// Other success logic
})
.doFinally(() -> {
log.info("Strategy took {} ms",
((System.nanoTime() - startTime) / 1000000)); // Logs ~1300ms
});
});
}
< /code>
[b]The Timing Issue[/b]
Class A's doOnSuccess
: Protokolle bei ~ 500 ms ✓ (erwartet)
Klasse Bs dofinally : Protokolle bei ~ 1300 ms ❌ (800 ms später als erwartet)
Die dofinal. den gleichen Betrieb messen. Das Protokoll von dofinal wird während der nachgeschalteten Verarbeitung irgendwo dazwischen gedruckt (nicht am Ende). Die Strategien werden von einem Testamentsvollstrecker ausgeführt. Alle Strategien mit hoher Priorität werden zuerst parallel ausgeführt, gefolgt von Medium und dann der niedrigen Priorität. Klasse A war eine am Ende durchgeführte Strategie mit niedriger Priorität. < /P>

Code: Select all

@Singleton
public class StrategyExecutor {
private final Executor executor; // ThreadContextAwareExecutor with cached thread pool

public Maybe strategyExecute(Request request, /* other params */) {
return Maybe.defer(() -> {
return Observable.fromArray(HIGH, MEDIUM, LOW)
.concatMapMaybe(priority -> {
List strategies = strategyMap.get(priority);

// Convert each strategy to Maybe and run concurrently
List maybesFromStrategies = strategies.stream()
.map(strategy -> strategy.execute(input)
.subscribeOn(Schedulers.from(executor))
.onErrorComplete() // Graceful error handling
.filter(Decision::isValid))
.collect(Collectors.toList());

return Maybe.merge(maybesFromStrategies)
.firstElement();
})
.firstElement();
});
}
}

< /code>

[*]  Umgebung mit hoher Durchsatz: < /p>
< /li>
  Läuft bei 40 tps (Transaktionen pro Sekunde) < /p> < /> < /li>
Executor Used - Executors.newCachedThreadPool()
Das Ergebnis des Strategie-Executors wird von einer Klasse der obersten Ebene konsumiert, die die nachgelagerten Dienste aufruft. Nachgeschaltete Dienste sind hoch latente Dienste < /p>

Code: Select all

public Observable processRequest(Request request) {
return strategyExecutor.strategyExecute(request, /* params */)  // Returns Maybe
.toObservable()
.flatMap(decision -> {
Service selectedService = serviceMap.get(decision.serviceName());
return selectedService.process(request);
});
}
< /code>
[b]Threading Concerns[/b]
Since we're running at high throughput with concurrent requests:

Could thread scheduling delays cause doFinally to execute much later?

[*]Is doFinally
Warten Sie auf einen Teil der nachgeschalteten Kette vor der Ausführung? (Dies geschieht bei Strategien mit hoher/mittlerer Priorität nicht)

In einer multithread-Umgebung unterscheidet sich dofinales Verhalten von einer einzelnen Thread-Ausführung? Vielleicht. TPs wirken sich aus, wenn dofinal ausgeführt wird? Zeit (nicht nachgeschaltete Verarbeitung) in diesem gleichzeitigen Zusammenführungsszenario? Wenn ich TimeInterval () im Strategie -Executor verwende und ein Doonsuccess zum Protokoll verwendet, funktioniert es wie erwartet.>

Quick Reply

Change Text Case: 
   
  • Similar Topics
    Replies
    Views
    Last post