Code: Select all
private Channel _queue;
private int _count;
private TaskCompletionSource _pauseTask;
void Main()
{
var cts = new CancellationTokenSource();
var ct = cts.Token;
ct.Register(() => _pauseTask?.TrySetCanceled());
_queue = Channel.CreateBounded(100);
// Simulate the addition of Jobs
Task.Run(async () => { while (!ct.IsCancellationRequested) { await _queue.Writer.WriteAsync(new Job(Interlocked.Increment(ref _count)), ct); Thread.Sleep(1000); }});
var processingTask = ProcessJobs(ct);
// Test pause and resume
Thread.Sleep(3000);
Pause();
Thread.Sleep(3000);
Resume();
// Test pause and cancel
Thread.Sleep(3000);
Pause();
cts.Cancel();
// Check that the cancel worked
processingTask.Wait();
Console.Read();
cts.Cancel();
_queue.Writer.TryComplete();
}
private void Pause()
{
Interlocked.CompareExchange(ref _pauseTask, new TaskCompletionSource(), null);
}
private void Resume()
{
var existing = Interlocked.Exchange(ref _pauseTask, null);
existing?.TrySetResult(null);
}
private async Task ProcessJobs(CancellationToken ct)
{
while (!ct.IsCancellationRequested)
{
try
{
var job = await _queue.Reader.ReadAsync(ct);
var pause = _pauseTask?.Task;
if (pause != null)
await pause;
await job.Process();
}
catch (Exception ex)
{
Console.WriteLine(ex.Message);
}
}
}
public class Job
{
private readonly int _count;
public Job(int count)
{
_count = count;
}
public async Task Process()
{
// await network communication
await Task.Run(() => Thread.Sleep(100));
Console.WriteLine($"Processing {_count}");
return true;
}
}