Masstransit mit Rabbitmq: Prioritätsmeldungen, die nicht in korrekter Reihenfolge verarbeitet werdenC#

Ein Treffpunkt für C#-Programmierer
Anonymous
 Masstransit mit Rabbitmq: Prioritätsmeldungen, die nicht in korrekter Reihenfolge verarbeitet werden

Post by Anonymous »

Ich verwende Masstransit mit Rabbitmq als Nachrichtenbus und versuche, eine Reihe von Nachrichten mit Prioritäten mit C#zu senden. Wenn ich jedoch die Stapelanforderung mit Prioritäten sende, werden die Antworten in der gleichen Reihenfolge wie die Nachrichten im Array zurückkehren und nicht die priorisierte Reihenfolge, die ich erwarte. < /P>

Code: Select all

builder.Services.AddMassTransit(x =>
{
x.AddConsumer();

x.UsingRabbitMq((context, cfg) =>
{
cfg.Host("url", "username", h =>
{
h.Username("username");
h.Password("password");
});

// Configure priority queue
cfg.ReceiveEndpoint("weather-info-priority-queue", e =>
{
// Enable message priority (0-9)
e.SetQueueArgument("x-max-priority", 10);
// Set lower prefetch count to ensure high-priority messages aren't waiting, this helps to process the messages one by one
e.PrefetchCount = 1;
e.ConcurrentMessageLimit = 1;

e.UseMessageRetry(r => r.Immediate(5));

e.ConfigureConsumer(context);
});
});
});
Mein Programm.cs config

Code: Select all

[HttpGet("batch-test-concurrent")]
public async Task BatchTestConcurrent(CancellationToken cancellationToken)
{
var cities = new[]
{
(City: "Paris", Priority: (byte)2),
(City: "Sydney", Priority: (byte)3),
(City: "Cancun", Priority: (byte)9),
(City: "Madrid", Priority: (byte)5),
(City: "Tokyo", Priority: (byte)1),
(City: "Barcelona", Priority: (byte)1),
(City: "Berlin", Priority: (byte)1)
};

// Send all requests with a small delay between them
var tasks = cities.Select(async (tuple, index) =>
{
var (City, Priority) = tuple;
// Small delay to ensure messages enter the queue separately
await Task.Delay(100 * index, cancellationToken);

_logger.LogInformation("Sending request for {City} with priority {Priority}", City, Priority);

var request = new WeatherInfoRequest
{
City = City,
RequestId = Guid.NewGuid()
};
Console.WriteLine("Sending request for city: {0} with priority {1}", City, Priority);
var response = await _requestClient.GetResponse(
request,
ctx =>
{
ctx.UseExecute(sendContext =>
{
sendContext.SetPriority(Priority);
_logger.LogInformation("Set priority {Priority} for {City}", Priority, City);
});
},
cancellationToken
);

Console.WriteLine("Received response for city: {0}", response.Message.City);
return new
{
response.Message.City,
Temperature = response.Message.TemperatureC,
TemperatureF = 32 + (int)(response.Message.TemperatureC / 0.5556),
Priority
};
});

var results = await Task.WhenAll(tasks);
return Ok(results.ToList());
}
Meine Controller -Funktion, bei der ich den Aufruf
drücke

Code: Select all

using MassTransit;
using MT_Req_Res.Models;

namespace MT_Req_Res.Consumers
{
public class WeatherInfoRequestConsumer : IConsumer
{
private static readonly string[] Summaries =
[
"Freezing", "Bracing", "Chilly", "Cool", "Mild", "Warm", "Balmy", "Hot", "Sweltering", "Scorching"
];

private readonly ILogger _logger;

public WeatherInfoRequestConsumer(ILogger logger)
{
_logger = logger;
}

public async Task Consume(ConsumeContext  context)
{
_logger.LogInformation("Processing request for city: {City} , RequestId: {RequestId}",
context.Message.City,
context.Message.RequestId);
// To retrieve the AMQP priority, get the RabbitMQ consume context payload
if (context.TryGetPayload(out var rabbitContext))
{
var priority = rabbitContext.Properties?.Priority;
_logger.LogInformation("Message AMQP priority: {Priority}", priority);
}
else
{
_logger.LogInformation("No RabbitMQ context payload found (not RabbitMQ or priority not set).");
}

// Simulate some processing time
await Task.Delay(2000);

// Create a response
var response = new WeatherInfoResponse
{
City = context.Message.City,
TemperatureC = Random.Shared.Next(-20, 55),
Summary = Summaries[Random.Shared.Next(Summaries.Length)],
RequestId = context.Message.RequestId
};

_logger.LogInformation("Sending response for city: {City}, temp: {Temp}°C",
response.City, response.TemperatureC);

// Send the response back
await context.RespondAsync(response);
}
}
}
Mein Verbraucher
Ausgabe:
Wenn ich den API -Endpunkt drücke, nehme ich die Antworten in derselben Reihenfolge wie die Nachrichten im Array zurück:
[
{
"city": "Paris",
"temperature": 36,
"temperatureF": 96,
"priority": 2
},
{
"city": "Sydney",
"temperature": -1,
"temperatureF": 31,
"priority": 3
},
{
"city": "Cancun",
"temperature": 47,
"temperatureF": 116,
"priority": 9
},
{
"city": "Madrid",
"temperature": 40,
"temperatureF": 103,
"priority": 5
},
{
"city": "Tokyo",
"temperature": 50,
"temperatureF": 121,
"priority": 1
},
{
"city": "Barcelona",
"temperature": 21,
"temperatureF": 69,
"priority": 1
},
{
"city": "Berlin",
"temperature": 24,
"temperatureF": 75,
"priority": 1
}
]
< /code>
Die Antworten kommen nicht in der Reihenfolge der Priorität zurück (d. H. "Cancun" mit Priorität 9 sollte zuerst verarbeitet werden, aber nicht). Wie kann ich sicherstellen, dass die Nachrichten in Prioritätsreihenfolge verarbeitet werden?

Quick Reply

Change Text Case: 
   
  • Similar Topics
    Replies
    Views
    Last post