Code: Select all
public async Task Enqueue(IMessage message)
{
bool result;
try
{
if (_cts.IsCancellationRequested)
{
_logger.LogInformation("{TypeName} cannot be enqueued because the queue is stopped.", message.GetType().Name);
result = false;
}
else
{
await _messageChannel.Writer.WriteAsync(message);
result = true;
_logger.LogInformation("Message {TypeName} written to internal channel.", message.GetType().Name);
}
}
catch (ChannelClosedException)
{
_logger.LogInformation("{TypeName} cannot be enqueued because the channel is closed.", message.GetType().Name);
result = false;
}
catch (Exception e)
{
_logger.LogError(e, "Error enqueuing {TypeName}", message.GetType().Name);
result = false;
}
return result;
}
< /code>
Im Klassenkonstruktor wird eine Message Reader -Schleife mit: < /p>
gestartetTask.Run(() => _MainMessageLoop(_cts.Token));
wobei _mainMessageloop wie:
aussieht
Code: Select all
private async Task _MainMessageLoop(CancellationToken stoppingToken)
{
try
{
var channelReader = _messageChannel.Reader;
while (await channelReader.WaitToReadAsync(stoppingToken).ConfigureAwait(false))
{
try
{
var item = await channelReader.ReadAsync(stoppingToken).ConfigureAwait(false);
_logger.LogInformation("Message {TypeName} read from internal channel.", item.GetType().Name);
await _DoSomethingAsync(item);
}
catch (Exception e)
{
_logger.LogError(e, "Error doing something");
}
}
_logger.LogInformation("Channel reader loop exited {MethodName}.", nameof(_MainMessageLoop));
}
catch (OperationCanceledException)
{
_logger.LogDebug("OperationCanceled {MethodName}", nameof(_MainMessageLoop));
throw;
}
catch (Exception e)
{
_logger.LogError(e, "Error in {MethodName}", nameof(_MainMessageLoop));
throw;
}
}