Java Stream forEachOrdered im parallelen ungeordneten Stream konsumiert in QuellreihenfolgeJava

Java-Forum
Guest
 Java Stream forEachOrdered im parallelen ungeordneten Stream konsumiert in Quellreihenfolge

Post by Guest »

Ich habe kürzlich einen bestimmten Algorithmus implementiert, der einen einzelnen Verbraucher und mehrere Produzenten umfasst und eine Blockierungswarteschlange für die Statusfreigabe verwendet. Während ich eine vereinfachte Variante implementierte, dachte ich, ich könnte sie vielleicht mit einer einfachen Java-Stream-Implementierung zum Laufen bringen.
Aber es funktionierte nicht wie erwartet. Ein Beispiel, das das unerwartete Verhalten reproduziert:

Code: Select all

LongStream.range(0, 32)
.unordered()
.parallel()
.map(value -> {
// 'do some work', which can vary in duration
try {
Thread.sleep(new Random(value).nextLong(1000));
} catch (final InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
System.out.println("map " + value);
return value;
}).forEachOrdered(value -> {
// actual implementation writes to output stream
System.out.println(">>> " + value);
System.out.flush();
});
Meine ursprüngliche Idee war, dass jede Karte, sobald sie fertig ist, (ungefähr) vom Terminalbetrieb verbraucht werden könnte. Und forEachOrdered würde im Vergleich zur Verwendung von forEach ein Element vor dem anderen verarbeiten (also keine manuelle Synchronisierung erforderlich). Aber wenn ich den Beispielcode ausführe, erhalte ich die folgende Ausgabe:

Code: Select all

map 7
map 29
map 8
map 11
map 28
map 21
map 20
map 13
map 6
map 10
map 1
map 15
map 5
map 24
map 26
map 14
map 9
map 2
map 31
map 17
map 22
map 16
map 3
map 23
map 12
map 27
map 0
>>> 0
>>> 1
>>> 2
>>> 3
map 4
>>> 4
>>> 5
>>> 6
>>> 7
>>> 8
>>> 9
>>> 10
>>> 11
>>> 12
>>> 13
>>> 14
>>> 15
>>> 16
>>> 17
map 25
map 30
map 18
>>> 18
map 19
>>> 19
>>> 20
>>> 21
>>> 22
>>> 23
>>> 24
>>> 25
>>> 26
>>> 27
>>> 28
>>> 29
>>> 30
>>> 31
Wie Sie sehen können, werden die Elemente in der falschen Reihenfolge zugeordnet, die Terminaloperation wird jedoch in der Reihenfolge der Streamquelle aufgerufen. Sie könnten das Problem verschlimmern, wenn Sie beim Erreichen des Werts 0 nur sehr lange schlafen und bei allen anderen Werten nicht schlafen. Die Reihenfolge spielt jedoch keine Rolle.
Das verwirrt mich, weil ich den Stream als ungeordnet erkläre. Und so wie ich es verstehe, wird forEachOrdered nicht unbedingt in der richtigen Reihenfolge verarbeitet, wenn es mit einem ungeordneten Stream verwendet wird.
Das bedeutet natürlich nicht, dass dies nicht möglich ist, also das aktuelle Verhalten, soweit ich es beurteilen kann Tell ist völlig in Ordnung. In meinem Fall einfach nicht verwendbar.
Verstehe ich etwas falsch? Ist es zu erwarten und wenn ja, was verursacht es und warum? Gibt es etwas, wo die Streams-Implementierung verbessert werden könnte?
Update:
Zur Beantwortung der Fragen etwas detaillierter:
In meiner ursprünglichen Implementierung habe ich eine Aufgabenwarteschlange mit Aufgaben, Produzenten-Threads nehmen Aufgaben aus dieser Warteschlange und schieben die Ergebnisse an eine einzelne Verbraucherwarteschlange. Jeder Produzententhread verarbeitet viele Aufgaben und erzeugt daher auch mehrere Werte. Der Consumer-Thread verarbeitet jeweils ein Ergebnis, was wichtig ist, da er in einen Ausgabestream schreibt, sodass die Aufrufe serialisiert/synchronisiert werden sollten.
Für die Stream-Implementierung war meine Idee, dies zu tun Streamen Sie die Aufgaben, lassen Sie die Produzenten die Kartenaufgaben sein und schreiben Sie in einem Terminalaufruf in die Ausgabe. Der Grund, warum ich forEachOrdered verwendet habe, ist, dass es verspricht
, jeweils ein Element zu verarbeiten. Aber es verspricht keine Ordnung, wenn der Stream keine definierte Begegnungsreihenfolge hat, was in Ordnung ist. Dieser letzte Punkt, zusammen mit der Deklaration des Streams als ungeordnet und parallel, brachte mich zu der Annahme, dass dies eine ungeordnete Parallelverarbeitung ermöglichen sollte. Aber das ist nicht der Fall.
Vielleicht ist es einfach ein Implementierungsdetail, warum dies derzeit nicht der Fall ist. Oder vielleicht verstehe ich das falsch. Beachten Sie, dass es wie erwartet funktioniert, wenn ich forEachOrdered in forEach ändere und innerhalb von forEach synchronisiere.

Quick Reply

Change Text Case: 
   
  • Similar Topics
    Replies
    Views
    Last post