Ist dies ein akzeptabler wartbarer Austausch von ManualResetEvent mithilfe von TaskCompletionsource [Duplicate]C#

Ein Treffpunkt für C#-Programmierer
Anonymous
 Ist dies ein akzeptabler wartbarer Austausch von ManualResetEvent mithilfe von TaskCompletionsource [Duplicate]

Post by Anonymous »

Ich habe eine Verarbeitungsschleife, die für die Lebensdauer des Prozesses laufen soll. Ich bin mitten im Konvertieren von einem BlockingCollection in ein Async/Auseait -Mustern mit Channel . Ich möchte die Möglichkeit, diese Schleife zu pausieren und wieder aufzunehmen, ohne darauf zu warten, dass die Warteschlange (der Kanal ) leeren oder verhindern, dass Elemente hinzugefügt werden. Die ursprüngliche Implementierung verwendete ManualReseteVent Aber in meinen Tests und Forschungen wurde ich davon überzeugt, dass ich das nicht verwenden sollte. Ich habe bereits darüber nachgedacht, eine Alternative zu finden, als ein Unit -Test blockiert wurde, da der erwartete nicht warten musste (der Kanal hatte bereits ein Element, sodass ReadAsync im Thread blieb) und er klug auf ein uneingeschränktes HandbuchResetEvent . Da der dort blockierte Thread den Code nicht erreicht hat, der ihn festgelegt hatte. Unten ist der Code, den ich verwendet habe, um die Idee zu testen. Bin ich verrückt danach? Gibt es eine andere Lösung, die ich nicht gesehen habe? Ist dies schlechter als ManualResetEvent in Bezug auf Ressourcen? Irgendwelche Vorschläge? Vielen Dank im Voraus. < /P>

Code: Select all

private Channel _queue;
private int _count;
private TaskCompletionSource _pauseTask;

void Main()
{
var cts = new CancellationTokenSource();
var ct = cts.Token;
ct.Register(() => _pauseTask?.TrySetCanceled());
_queue = Channel.CreateBounded(100);

// Simulate the addition of Jobs
Task.Run(async () => { while (!ct.IsCancellationRequested) { await _queue.Writer.WriteAsync(new Job(Interlocked.Increment(ref _count)), ct); Thread.Sleep(1000); }});

var processingTask = ProcessJobs(ct);

// Test pause and resume
Thread.Sleep(3000);
Pause();
Thread.Sleep(3000);
Resume();
// Test pause and cancel
Thread.Sleep(3000);
Pause();
cts.Cancel();
// Check that the cancel worked
processingTask.Wait();

Console.Read();
cts.Cancel();
_queue.Writer.TryComplete();
}

private void Pause()
{
Interlocked.CompareExchange(ref _pauseTask, new TaskCompletionSource(), null);
}

private void Resume()
{
var existing = Interlocked.Exchange(ref _pauseTask, null);
existing?.TrySetResult(null);
}

private async Task ProcessJobs(CancellationToken ct)
{
while (!ct.IsCancellationRequested)
{
try
{
var job = await _queue.Reader.ReadAsync(ct);
var pause = _pauseTask?.Task;
if (pause != null)
await pause;
await job.Process();
}
catch (Exception ex)
{
Console.WriteLine(ex.Message);
}
}
}

public class Job
{
private readonly int _count;

public Job(int count)
{
_count = count;
}

public async Task Process()
{
// await network communication
await Task.Run(() => Thread.Sleep(100));
Console.WriteLine($"Processing {_count}");
return true;
}
}

Quick Reply

Change Text Case: 
   
  • Similar Topics
    Replies
    Views
    Last post