Ich verwende Masstransit mit Rabbitmq als Nachrichtenbus und versuche, eine Reihe von Nachrichten mit Prioritäten mit C#zu senden. Wenn ich jedoch die Stapelanforderung mit Prioritäten sende, werden die Antworten in der gleichen Reihenfolge wie die Nachrichten im Array zurückkehren und nicht die priorisierte Reihenfolge, die ich erwarte. < /P>
using MassTransit;
using MT_Req_Res.Models;
namespace MT_Req_Res.Consumers
{
public class WeatherInfoRequestConsumer : IConsumer
{
private static readonly string[] Summaries =
[
"Freezing", "Bracing", "Chilly", "Cool", "Mild", "Warm", "Balmy", "Hot", "Sweltering", "Scorching"
];
private readonly ILogger _logger;
public WeatherInfoRequestConsumer(ILogger logger)
{
_logger = logger;
}
public async Task Consume(ConsumeContext context)
{
_logger.LogInformation("Processing request for city: {City} , RequestId: {RequestId}",
context.Message.City,
context.Message.RequestId);
// To retrieve the AMQP priority, get the RabbitMQ consume context payload
if (context.TryGetPayload(out var rabbitContext))
{
var priority = rabbitContext.Properties?.Priority;
_logger.LogInformation("Message AMQP priority: {Priority}", priority);
}
else
{
_logger.LogInformation("No RabbitMQ context payload found (not RabbitMQ or priority not set).");
}
// Simulate some processing time
await Task.Delay(2000);
// Create a response
var response = new WeatherInfoResponse
{
City = context.Message.City,
TemperatureC = Random.Shared.Next(-20, 55),
Summary = Summaries[Random.Shared.Next(Summaries.Length)],
RequestId = context.Message.RequestId
};
_logger.LogInformation("Sending response for city: {City}, temp: {Temp}°C",
response.City, response.TemperatureC);
// Send the response back
await context.RespondAsync(response);
}
}
}
Mein Verbraucher
Ausgabe:
Wenn ich den API -Endpunkt drücke, nehme ich die Antworten in derselben Reihenfolge wie die Nachrichten im Array zurück:
[
{
"city": "Paris",
"temperature": 36,
"temperatureF": 96,
"priority": 2
},
{
"city": "Sydney",
"temperature": -1,
"temperatureF": 31,
"priority": 3
},
{
"city": "Cancun",
"temperature": 47,
"temperatureF": 116,
"priority": 9
},
{
"city": "Madrid",
"temperature": 40,
"temperatureF": 103,
"priority": 5
},
{
"city": "Tokyo",
"temperature": 50,
"temperatureF": 121,
"priority": 1
},
{
"city": "Barcelona",
"temperature": 21,
"temperatureF": 69,
"priority": 1
},
{
"city": "Berlin",
"temperature": 24,
"temperatureF": 75,
"priority": 1
}
]
< /code>
Die Antworten kommen nicht in der Reihenfolge der Priorität zurück (d. H. "Cancun" mit Priorität 9 sollte zuerst verarbeitet werden, aber nicht). Wie kann ich sicherstellen, dass die Nachrichten in Prioritätsreihenfolge verarbeitet werden?
Ich verwende Masstransit mit Rabbitmq als Nachrichtenbus und versuche, eine Reihe von Nachrichten mit Prioritäten mit C#zu senden. Wenn ich jedoch die Stapelanforderung mit Prioritäten sende, werden die Antworten in der gleichen Reihenfolge wie die Nachrichten im Array zurückkehren und nicht die priorisierte Reihenfolge, die ich erwarte. < /P> [code]builder.Services.AddMassTransit(x => { x.AddConsumer();
// Configure priority queue cfg.ReceiveEndpoint("weather-info-priority-queue", e => { // Enable message priority (0-9) e.SetQueueArgument("x-max-priority", 10); // Set lower prefetch count to ensure high-priority messages aren't waiting, this helps to process the messages one by one e.PrefetchCount = 1; e.ConcurrentMessageLimit = 1;
// Send all requests with a small delay between them var tasks = cities.Select(async (tuple, index) => { var (City, Priority) = tuple; // Small delay to ensure messages enter the queue separately await Task.Delay(100 * index, cancellationToken);
_logger.LogInformation("Sending request for {City} with priority {Priority}", City, Priority);
var request = new WeatherInfoRequest { City = City, RequestId = Guid.NewGuid() }; Console.WriteLine("Sending request for city: {0} with priority {1}", City, Priority); var response = await _requestClient.GetResponse( request, ctx => { ctx.UseExecute(sendContext => { sendContext.SetPriority(Priority); _logger.LogInformation("Set priority {Priority} for {City}", Priority, City); }); }, cancellationToken );
Console.WriteLine("Received response for city: {0}", response.Message.City); return new { response.Message.City, Temperature = response.Message.TemperatureC, TemperatureF = 32 + (int)(response.Message.TemperatureC / 0.5556), Priority }; });
var results = await Task.WhenAll(tasks); return Ok(results.ToList()); } [/code] [b] Meine Controller -Funktion, bei der ich den Aufruf [/b] drücke[code]using MassTransit; using MT_Req_Res.Models;
public WeatherInfoRequestConsumer(ILogger logger) { _logger = logger; }
public async Task Consume(ConsumeContext context) { _logger.LogInformation("Processing request for city: {City} , RequestId: {RequestId}", context.Message.City, context.Message.RequestId); // To retrieve the AMQP priority, get the RabbitMQ consume context payload if (context.TryGetPayload(out var rabbitContext)) { var priority = rabbitContext.Properties?.Priority; _logger.LogInformation("Message AMQP priority: {Priority}", priority); } else { _logger.LogInformation("No RabbitMQ context payload found (not RabbitMQ or priority not set)."); }
// Simulate some processing time await Task.Delay(2000);
// Create a response var response = new WeatherInfoResponse { City = context.Message.City, TemperatureC = Random.Shared.Next(-20, 55), Summary = Summaries[Random.Shared.Next(Summaries.Length)], RequestId = context.Message.RequestId };
_logger.LogInformation("Sending response for city: {City}, temp: {Temp}°C", response.City, response.TemperatureC);
// Send the response back await context.RespondAsync(response); } } } [/code] [b] Mein Verbraucher [/b] Ausgabe: Wenn ich den API -Endpunkt drücke, nehme ich die Antworten in derselben Reihenfolge wie die Nachrichten im Array zurück: [ { "city": "Paris", "temperature": 36, "temperatureF": 96, "priority": 2 }, { "city": "Sydney", "temperature": -1, "temperatureF": 31, "priority": 3 }, { "city": "Cancun", "temperature": 47, "temperatureF": 116, "priority": 9 }, { "city": "Madrid", "temperature": 40, "temperatureF": 103, "priority": 5 }, { "city": "Tokyo", "temperature": 50, "temperatureF": 121, "priority": 1 }, { "city": "Barcelona", "temperature": 21, "temperatureF": 69, "priority": 1 }, { "city": "Berlin", "temperature": 24, "temperatureF": 75, "priority": 1 } ] < /code> Die Antworten kommen nicht in der Reihenfolge der Priorität zurück (d. H. "Cancun" mit Priorität 9 sollte zuerst verarbeitet werden, aber nicht). Wie kann ich sicherstellen, dass die Nachrichten in Prioritätsreihenfolge verarbeitet werden?
Wir verwenden Camel für Rabbitmq -Hersteller und Verbraucher. Integration
long deliveryTag = envelope.getDeliveryTag();
channel.basicAck(deliveryTag, false);
Bitte lassen Sie uns wissen, ob es...
public async Task GetAllCompanyTeachersAsync(string companyId, int skip, int take)
{
var response = await _teacherRequest.GetResponse(new GetAllCompanyTeacherDataRequest
{
CompanyId = companyId,...
Ich versuche, MassTransit so zu konfigurieren, dass eine benutzerdefinierte Deserialisierungsstrategie für eine EventPlanCreated-Nachricht verwendet wird, wenn Nachrichten von Azure Service Bus...
Ich habe einen Azure EventHub mit Nachrichten ohne Umschlag. Sie sind ein „Umschlag“, aber sie haben mein eigenes Format. Den Beispielen, die ich online sehe, zufolge sollte dies die richtige...