Code: Select all
Class A (the actual implementation):
Code: Select all
public Maybe execute(Input input) {
long start = System.nanoTime();
return Maybe.fromCallable(() -> {
// Some processing that takes ~500ms
return callExternalService();
})
.doOnSuccess(decision -> {
log.info("Implementation took {} ms",
((System.nanoTime() - start) / 1000000)); // Logs ~500ms
});
}
< /code>
Class B (a decorator that wraps Class A):
Code: Select all
public Maybe execute(Input input) {
return Maybe.defer(() -> {
long startTime = System.nanoTime();
return delegate.execute(input) // Calls Class A
.doOnSuccess(decision -> {
// Other success logic
})
.doFinally(() -> {
log.info("Strategy took {} ms",
((System.nanoTime() - startTime) / 1000000)); // Logs ~1300ms
});
});
}
< /code>
[b]The Timing Issue[/b]
Class A's doOnSuccess
Klasse Bs dofinally : Protokolle bei ~ 1300 ms
Die dofinal. den gleichen Betrieb messen. Das Protokoll von dofinal wird während der nachgeschalteten Verarbeitung irgendwo dazwischen gedruckt (nicht am Ende). Die Strategien werden von einem Testamentsvollstrecker ausgeführt. Alle Strategien mit hoher Priorität werden zuerst parallel ausgeführt, gefolgt von Medium und dann der niedrigen Priorität. Klasse A war eine am Ende durchgeführte Strategie mit niedriger Priorität. < /P>
Code: Select all
@Singleton
public class StrategyExecutor {
private final Executor executor; // ThreadContextAwareExecutor with cached thread pool
public Maybe strategyExecute(Request request, /* other params */) {
return Maybe.defer(() -> {
return Observable.fromArray(HIGH, MEDIUM, LOW)
.concatMapMaybe(priority -> {
List strategies = strategyMap.get(priority);
// Convert each strategy to Maybe and run concurrently
List maybesFromStrategies = strategies.stream()
.map(strategy -> strategy.execute(input)
.subscribeOn(Schedulers.from(executor))
.onErrorComplete() // Graceful error handling
.filter(Decision::isValid))
.collect(Collectors.toList());
return Maybe.merge(maybesFromStrategies)
.firstElement();
})
.firstElement();
});
}
}
< /code>
[*] Umgebung mit hoher Durchsatz: < /p>
< /li>
Läuft bei 40 tps (Transaktionen pro Sekunde) < /p> < /> < /li>
Executor Used - Executors.newCachedThreadPool()
Code: Select all
public Observable processRequest(Request request) {
return strategyExecutor.strategyExecute(request, /* params */) // Returns Maybe
.toObservable()
.flatMap(decision -> {
Service selectedService = serviceMap.get(decision.serviceName());
return selectedService.process(request);
});
}
< /code>
[b]Threading Concerns[/b]
Since we're running at high throughput with concurrent requests:
Could thread scheduling delays cause doFinally to execute much later?
[*]Is doFinally
In einer multithread-Umgebung unterscheidet sich dofinales Verhalten von einer einzelnen Thread-Ausführung? Vielleicht. TPs wirken sich aus, wenn dofinal ausgeführt wird? Zeit (nicht nachgeschaltete Verarbeitung) in diesem gleichzeitigen Zusammenführungsszenario? Wenn ich TimeInterval () im Strategie -Executor verwende und ein Doonsuccess zum Protokoll verwendet, funktioniert es wie erwartet.>