Anonymous
Masstransit plante Wiederaufnahme mit Kafka und Hangfire funktioniert nicht
Post
by Anonymous » 17 Aug 2025, 12:02
Ich versuche, die geplante Neulieferung in Masstransit bei der Verwendung von Kafka als Transport zu konfigurieren. Redelivery mit Hangfire , wie in der Dokumentation vorgeschlagen. /> Ich habe die Anweisungen befolgt und verwendet useLayedErivery mit uswendeduledErivery .
Ich habe auch Hangfire wie in der Dokumentation gezeigt.
Code: Select all
services.AddMassTransit(configurator =>
{
configurator.AddPublishMessageScheduler();
configurator.AddHangfireConsumers();
configurator.SetKebabCaseEndpointNameFormatter();
configurator.UsingInMemory((context, cfg) =>
{
cfg.UsePublishMessageScheduler();
cfg.ConfigureEndpoints(context);
});
configurator.AddRider(rider =>
{
rider.AddConsumer();
rider.UsingKafka((context, factoryConfigurator) =>
{
var kafkaSettings = context.GetRequiredService().Value;
factoryConfigurator.Host(kafkaSettings.Host);
factoryConfigurator.TopicEndpoint(
kafkaSettings.ReputationTopic,
kafkaSettings.ReputationConsumerGroup,
cfg =>
{
cfg.ConfigureConsumer(context);
cfg.UseScheduledRedelivery(r => r.Intervals(ScheduledRedeliveryIntervals));
cfg.UseMessageRetry(r => r.Intervals(ImmediateRetryIntervals));
cfg.UseInMemoryOutbox(context);
});
});
});
});
< /code>
services.AddHangfire(x => x.UsePostgreSqlStorage(options =>
{
var connectionString = configuration.GetConnectionString(postgresSqlConnectionName);
options.UseNpgsqlConnection(connectionString);
})
.SetDataCompatibilityLevel(CompatibilityLevel.Version_180)
.UseSimpleAssemblyNameTypeSerializer()
.UseRecommendedSerializerSettings()
.UseSerilogLogProvider()
.UseFilter(new AutomaticRetryAttribute
{
Attempts = 10,
//5sec, 10 sec, 15sec, 30sec, 1min, 5min, 10min, 1h, 12h, 24h
DelaysInSeconds = [5, 10, 15, 30, 60, 300, 600, 1800, 43200, 86400]
}));
services.AddHangfireServer();
< /code>
What happens:
[*]Immediate retries work as expected.
[*]Scheduled redelivery
erstellt einen Hangfire Job (ich kann ihn im Hangfire Dashboard sehen). Dashboard: < /p>
Code: Select all
// Id: #324
using MassTransit.HangfireIntegration;
var scheduleJob = Activate();
await scheduleJob.SendMessage(
FromJson("{
\"HashId\":\"f053000015a25d25b0ec08dddd701bff\",
\"DestinationAddress\":\"loopback://localhost/kafka/main-events-topic\",
\"ContentType\":\"application/vnd.masstransit+json\",
\"ResponseAddress\":\"\",
\"FaultAddress\":\"\",
\"Body\":\"{\r\n \\\"messageId\\\": \\\"f0530000-15a2-5d25-4ce1-08dddd701c25\\\",\r\n \\\"requestId\\\": null,\r\n \\\"correlationId\\\": \\\"ca6d9695-f3e2-4f32-934b-24c4af52feb0\\\",\r\n \\\"conversationId\\\": \\\"44550000-5d25-0015-e878-08dddd6fc501\\\",\r\n \\\"initiatorId\\\": null,\r\n \\\"sourceAddress\\\": \\\"loopback://localhost/\\\",\r\n \\\"destinationAddress\\\": null,\r\n \\\"responseAddress\\\": null,\r\n \\\"faultAddress\\\": null,\r\n \\\"messageType\\\": [\r\n \\\"urn:message:UserService.Domain.Events:BaseEvent\\\"\r\n ],\r\n \\\"message\\\": {\r\n \\\"eventId\\\": \\\"ca6d9695-f3e2-4f32-934b-24c4af52feb0\\\",\r\n \\\"eventType\\\": \\\"QuestionUpvote\\\",\r\n \\\"userId\\\": 12\r\n },\r\n \\\"expirationTime\\\": null,\r\n \\\"sentTime\\\": \\\"2025-08-17T09:26:13.2771324Z\\\",\r\n \\\"headers\\\": {\r\n \\\"MessageId\\\": \\\"44550000-5d25-0015-e240-08dddd6fc501\\\",\r\n \\\"CorrelationId\\\": \\\"ca6d9695-f3e2-4f32-934b-24c4af52feb0\\\",\r\n \\\"ConversationId\\\": \\\"44550000-5d25-0015-e878-08dddd6fc501\\\",\r\n \\\"DestinationAddress\\\": \\\"loopback://localhost/kafka/main-events-topic\\\",\r\n \\\"SourceAddress\\\": \\\"loopback://localhost/\\\",\r\n \\\"MT-Activity-Id\\\": \\\"00-84da22b3a6b6023ddd446a02d845717d-2063eaf79320b845-01\\\",\r\n \\\"MT-OriginalMessageId\\\": \\\"44550000-5d25-0015-e240-08dddd6fc501\\\",\r\n \\\"MT-Redelivery-Count\\\": 1\r\n },\r\n \\\"host\\\": {\r\n \\\"machineName\\\": \\\"DESKTOP-P6ELB1O\\\",\r\n \\\"processName\\\": \\\"UserService.Api\\\",\r\n \\\"processId\\\": 21488,\r\n \\\"assembly\\\": \\\"UserService.Api\\\",\r\n \\\"assemblyVersion\\\": \\\"1.0.0.0\\\",\r\n \\\"frameworkVersion\\\": \\\"9.0.8\\\",\r\n \\\"massTransitVersion\\\": \\\"8.5.1.0\\\",\r\n \\\"operatingSystemVersion\\\": \\\"Microsoft Windows NT 10.0.26100.0\\\"\r\n }\r\n}\",
\"MessageId\":\"f0530000-15a2-5d25-80bf-08dddd701c0e\",
\"CorrelationId\":\"ca6d9695-f3e2-4f32-934b-24c4af52feb0\",
\"ConversationId\":\"44550000-5d25-0015-e878-08dddd6fc501\",
\"TokenId\":\"f0530000-15a2-5d25-b0ec-08dddd701bff\",
\"HeadersAsJson\":\"{\r\n \\\"MessageId\\\": \\\"44550000-5d25-0015-e240-08dddd6fc501\\\",\r\n \\\"CorrelationId\\\": \\\"ca6d9695-f3e2-4f32-934b-24c4af52feb0\\\",\r\n \\\"ConversationId\\\": \\\"44550000-5d25-0015-e878-08dddd6fc501\\\",\r\n \\\"DestinationAddress\\\": \\\"loopback://localhost/kafka/main-events-topic\\\",\r\n \\\"SourceAddress\\\": \\\"loopback://localhost/\\\",\r\n \\\"MT-Activity-Id\\\": \\\"00-84da22b3a6b6023ddd446a02d845717d-2063eaf79320b845-01\\\",\r\n \\\"MT-OriginalMessageId\\\": \\\"44550000-5d25-0015-e240-08dddd6fc501\\\",\r\n \\\"MT-Redelivery-Count\\\": 1\r\n}\",
\"MessageType\":\"urn:message:UserService.Domain.Events:BaseEvent\",
\"Destination\":\"loopback://localhost/kafka/main-events-topic\"
}"),
null);
< /code>
My questions:
Do I need to configure UsePublishMessageScheduler()
sowohl bei Verwendung vonInmemory als auch bei Verwendung von kafka ?
Warum funktioniert Hangfire nur, wenn ich den In-Memory-Scheduler konfigurieren kann. Verbraucher wirklich wiederverkeuert und geliefert?
Code: Select all
public class ReputationEventConsumer : IConsumer
{
public Task Consume(ConsumeContext context)
{
// Throw exception on purpose to trigger retries/redelivery
throw new NotImplementedByDesignException();
}
}
1755424974
Anonymous
Ich versuche, die geplante Neulieferung in Masstransit bei der Verwendung von Kafka als Transport zu konfigurieren. Redelivery mit Hangfire , wie in der Dokumentation vorgeschlagen. /> Ich habe die Anweisungen befolgt und verwendet useLayedErivery mit uswendeduledErivery . Ich habe auch Hangfire wie in der Dokumentation gezeigt.[code]services.AddMassTransit(configurator => { configurator.AddPublishMessageScheduler(); configurator.AddHangfireConsumers(); configurator.SetKebabCaseEndpointNameFormatter(); configurator.UsingInMemory((context, cfg) => { cfg.UsePublishMessageScheduler(); cfg.ConfigureEndpoints(context); }); configurator.AddRider(rider => { rider.AddConsumer(); rider.UsingKafka((context, factoryConfigurator) => { var kafkaSettings = context.GetRequiredService().Value; factoryConfigurator.Host(kafkaSettings.Host); factoryConfigurator.TopicEndpoint( kafkaSettings.ReputationTopic, kafkaSettings.ReputationConsumerGroup, cfg => { cfg.ConfigureConsumer(context); cfg.UseScheduledRedelivery(r => r.Intervals(ScheduledRedeliveryIntervals)); cfg.UseMessageRetry(r => r.Intervals(ImmediateRetryIntervals)); cfg.UseInMemoryOutbox(context); }); }); }); }); < /code> services.AddHangfire(x => x.UsePostgreSqlStorage(options => { var connectionString = configuration.GetConnectionString(postgresSqlConnectionName); options.UseNpgsqlConnection(connectionString); }) .SetDataCompatibilityLevel(CompatibilityLevel.Version_180) .UseSimpleAssemblyNameTypeSerializer() .UseRecommendedSerializerSettings() .UseSerilogLogProvider() .UseFilter(new AutomaticRetryAttribute { Attempts = 10, //5sec, 10 sec, 15sec, 30sec, 1min, 5min, 10min, 1h, 12h, 24h DelaysInSeconds = [5, 10, 15, 30, 60, 300, 600, 1800, 43200, 86400] })); services.AddHangfireServer(); < /code> What happens: [*]Immediate retries work as expected. [*]Scheduled redelivery[/code] erstellt einen Hangfire Job (ich kann ihn im Hangfire Dashboard sehen). Dashboard: < /p> [code]// Id: #324 using MassTransit.HangfireIntegration; var scheduleJob = Activate(); await scheduleJob.SendMessage( FromJson("{ \"HashId\":\"f053000015a25d25b0ec08dddd701bff\", \"DestinationAddress\":\"loopback://localhost/kafka/main-events-topic\", \"ContentType\":\"application/vnd.masstransit+json\", \"ResponseAddress\":\"\", \"FaultAddress\":\"\", \"Body\":\"{\r\n \\\"messageId\\\": \\\"f0530000-15a2-5d25-4ce1-08dddd701c25\\\",\r\n \\\"requestId\\\": null,\r\n \\\"correlationId\\\": \\\"ca6d9695-f3e2-4f32-934b-24c4af52feb0\\\",\r\n \\\"conversationId\\\": \\\"44550000-5d25-0015-e878-08dddd6fc501\\\",\r\n \\\"initiatorId\\\": null,\r\n \\\"sourceAddress\\\": \\\"loopback://localhost/\\\",\r\n \\\"destinationAddress\\\": null,\r\n \\\"responseAddress\\\": null,\r\n \\\"faultAddress\\\": null,\r\n \\\"messageType\\\": [\r\n \\\"urn:message:UserService.Domain.Events:BaseEvent\\\"\r\n ],\r\n \\\"message\\\": {\r\n \\\"eventId\\\": \\\"ca6d9695-f3e2-4f32-934b-24c4af52feb0\\\",\r\n \\\"eventType\\\": \\\"QuestionUpvote\\\",\r\n \\\"userId\\\": 12\r\n },\r\n \\\"expirationTime\\\": null,\r\n \\\"sentTime\\\": \\\"2025-08-17T09:26:13.2771324Z\\\",\r\n \\\"headers\\\": {\r\n \\\"MessageId\\\": \\\"44550000-5d25-0015-e240-08dddd6fc501\\\",\r\n \\\"CorrelationId\\\": \\\"ca6d9695-f3e2-4f32-934b-24c4af52feb0\\\",\r\n \\\"ConversationId\\\": \\\"44550000-5d25-0015-e878-08dddd6fc501\\\",\r\n \\\"DestinationAddress\\\": \\\"loopback://localhost/kafka/main-events-topic\\\",\r\n \\\"SourceAddress\\\": \\\"loopback://localhost/\\\",\r\n \\\"MT-Activity-Id\\\": \\\"00-84da22b3a6b6023ddd446a02d845717d-2063eaf79320b845-01\\\",\r\n \\\"MT-OriginalMessageId\\\": \\\"44550000-5d25-0015-e240-08dddd6fc501\\\",\r\n \\\"MT-Redelivery-Count\\\": 1\r\n },\r\n \\\"host\\\": {\r\n \\\"machineName\\\": \\\"DESKTOP-P6ELB1O\\\",\r\n \\\"processName\\\": \\\"UserService.Api\\\",\r\n \\\"processId\\\": 21488,\r\n \\\"assembly\\\": \\\"UserService.Api\\\",\r\n \\\"assemblyVersion\\\": \\\"1.0.0.0\\\",\r\n \\\"frameworkVersion\\\": \\\"9.0.8\\\",\r\n \\\"massTransitVersion\\\": \\\"8.5.1.0\\\",\r\n \\\"operatingSystemVersion\\\": \\\"Microsoft Windows NT 10.0.26100.0\\\"\r\n }\r\n}\", \"MessageId\":\"f0530000-15a2-5d25-80bf-08dddd701c0e\", \"CorrelationId\":\"ca6d9695-f3e2-4f32-934b-24c4af52feb0\", \"ConversationId\":\"44550000-5d25-0015-e878-08dddd6fc501\", \"TokenId\":\"f0530000-15a2-5d25-b0ec-08dddd701bff\", \"HeadersAsJson\":\"{\r\n \\\"MessageId\\\": \\\"44550000-5d25-0015-e240-08dddd6fc501\\\",\r\n \\\"CorrelationId\\\": \\\"ca6d9695-f3e2-4f32-934b-24c4af52feb0\\\",\r\n \\\"ConversationId\\\": \\\"44550000-5d25-0015-e878-08dddd6fc501\\\",\r\n \\\"DestinationAddress\\\": \\\"loopback://localhost/kafka/main-events-topic\\\",\r\n \\\"SourceAddress\\\": \\\"loopback://localhost/\\\",\r\n \\\"MT-Activity-Id\\\": \\\"00-84da22b3a6b6023ddd446a02d845717d-2063eaf79320b845-01\\\",\r\n \\\"MT-OriginalMessageId\\\": \\\"44550000-5d25-0015-e240-08dddd6fc501\\\",\r\n \\\"MT-Redelivery-Count\\\": 1\r\n}\", \"MessageType\":\"urn:message:UserService.Domain.Events:BaseEvent\", \"Destination\":\"loopback://localhost/kafka/main-events-topic\" }"), null); < /code> My questions: Do I need to configure UsePublishMessageScheduler()[/code] sowohl bei Verwendung vonInmemory als auch bei Verwendung von kafka ? Warum funktioniert Hangfire nur, wenn ich den In-Memory-Scheduler konfigurieren kann. Verbraucher wirklich wiederverkeuert und geliefert?[code]public class ReputationEventConsumer : IConsumer { public Task Consume(ConsumeContext context) { // Throw exception on purpose to trigger retries/redelivery throw new NotImplementedByDesignException(); } } [/code]