< /blockquote>
Es gibt keine solche Option oder Überlastung für die asynchrone Parallel.foreachasync < /code>. Und das ist ein Problem für mich, da ich diese Methode mit einem Kanal als Quelle in einem Produzenten-Verbraucher-Szenario als Verbraucher verwenden möchte. In meinem Szenario ist es wichtig, dass der Verbraucher genau das beißt, was er kauen kann, und nicht mehr. Ich möchte nicht, dass der Verbraucher den Kanal aggressiv zieht und dann die gezogenen Elemente in seinen persönlichen versteckten Puffer einfügt. Ich möchte > Bis vor kurzem hatte ich den Eindruck, dass die parallel.foreachaSync -Methode nicht von Design puffern. Aber um sicherzugehen, dass ich Microsoft auf Github um eine Klarstellung gefragt habe. Ich habe sehr schnell Feedback erhalten, aber nicht das, was ich erwartet hatte: < /p>
Es handelt sich um ein Implementierungsdetail. Mit parallel.foreach < / code> wird die Pufferung durchgeführt, um Körperdelegierte zu verarbeiten, die möglicherweise sehr schnell sind, und es wird daher versucht, die Kosten für die Einnahme des Schlosses für den Zugriff auf den gemeinsam genutzten Enumerator zu minimieren / zu amortisieren. Mit foreachaSync wird erwartet, dass die Körperdelegierten zumindest ein bisschen fleischiger sein werden und daher nicht versucht, eine solche Amortisation durchzuführen. Zumindest nicht heute. Ich muss also meinen Ansatz überdenken. Verhalten? Wenn ja, wie? Ich bitte um eine Art dünne Wrapper um die vorhandene Parallel. So etwas: < /p>
Code: Select all
public static Task ForEachAsync_NoBuffering(
IAsyncEnumerable source,
ParallelOptions parallelOptions,
Func body)
{
// Some magic here
return Parallel.ForEachAsync(source, parallelOptions, body);
}
< /code>
Der Wrapper sollte sich genau mit der parallel.foreachaSync < /code> -Methode auf .NET genau verhalten. p> [b] Update: [/b] Hier ist das grundlegende Layout meines Szenarios:
class Processor
{
private readonly Channel _channel;
private readonly Task _consumer;
public Processor()
{
_channel = Channel.CreateUnbounded();
_consumer = StartConsumer();
}
public int PendingItemsCount => _channel.Reader.Count;
public Task Completion => _consumer;
public void QueueItem(Item item) => _channel.Writer.TryWrite(item);
private async Task StartConsumer()
{
ParallelOptions options = new() { MaxDegreeOfParallelism = 2 };
await Parallel.ForEachAsync(_channel.Reader.ReadAllAsync(), options, async (item, _) =>
{
// Call async API
// Persist the response of the API in an RDBMS
});
}
}