Aber es funktionierte nicht wie erwartet. Ein Beispiel, das das unerwartete Verhalten reproduziert:
Code: Select all
LongStream.range(0, 32)
.unordered()
.parallel()
.map(value -> {
// 'do some work', which can vary in duration
try {
Thread.sleep(new Random(value).nextLong(1000));
} catch (final InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
System.out.println("map " + value);
return value;
}).forEachOrdered(value -> {
// actual implementation writes to output stream
System.out.println(">>> " + value);
System.out.flush();
});
Code: Select all
map 7
map 29
map 8
map 11
map 28
map 21
map 20
map 13
map 6
map 10
map 1
map 15
map 5
map 24
map 26
map 14
map 9
map 2
map 31
map 17
map 22
map 16
map 3
map 23
map 12
map 27
map 0
>>> 0
>>> 1
>>> 2
>>> 3
map 4
>>> 4
>>> 5
>>> 6
>>> 7
>>> 8
>>> 9
>>> 10
>>> 11
>>> 12
>>> 13
>>> 14
>>> 15
>>> 16
>>> 17
map 25
map 30
map 18
>>> 18
map 19
>>> 19
>>> 20
>>> 21
>>> 22
>>> 23
>>> 24
>>> 25
>>> 26
>>> 27
>>> 28
>>> 29
>>> 30
>>> 31
Das verwirrt mich, weil ich den Stream als ungeordnet erkläre. Und so wie ich es verstehe, wird forEachOrdered nicht unbedingt in der richtigen Reihenfolge verarbeitet, wenn es mit einem ungeordneten Stream verwendet wird.
Das bedeutet natürlich nicht, dass dies nicht möglich ist, also das aktuelle Verhalten, soweit ich es beurteilen kann Tell ist völlig in Ordnung. In meinem Fall einfach nicht verwendbar.
Verstehe ich etwas falsch? Ist es zu erwarten und wenn ja, was verursacht es und warum? Gibt es etwas, wo die Streams-Implementierung verbessert werden könnte?
Update:
Zur Beantwortung der Fragen etwas detaillierter:
In meiner ursprünglichen Implementierung habe ich eine Aufgabenwarteschlange mit Aufgaben, Produzenten-Threads nehmen Aufgaben aus dieser Warteschlange und schieben die Ergebnisse an eine einzelne Verbraucherwarteschlange. Jeder Produzententhread verarbeitet viele Aufgaben und erzeugt daher auch mehrere Werte. Der Consumer-Thread verarbeitet jeweils ein Ergebnis, was wichtig ist, da er in einen Ausgabestream schreibt, sodass die Aufrufe serialisiert/synchronisiert werden sollten.
Für die Stream-Implementierung war meine Idee, dies zu tun Streamen Sie die Aufgaben, lassen Sie die Produzenten die Kartenaufgaben sein und schreiben Sie in einem Terminalaufruf in die Ausgabe. Der Grund, warum ich forEachOrdered verwendet habe, ist, dass es verspricht
, jeweils ein Element zu verarbeiten. Aber es verspricht keine Ordnung, wenn der Stream keine definierte Begegnungsreihenfolge hat, was in Ordnung ist. Dieser letzte Punkt, zusammen mit der Deklaration des Streams als ungeordnet und parallel, brachte mich zu der Annahme, dass dies eine ungeordnete Parallelverarbeitung ermöglichen sollte. Aber das ist nicht der Fall.
Vielleicht ist es einfach ein Implementierungsdetail, warum dies derzeit nicht der Fall ist. Oder vielleicht verstehe ich das falsch. Beachten Sie, dass es wie erwartet funktioniert, wenn ich forEachOrdered in forEach ändere und innerhalb von forEach synchronisiere.