Anonymous
Masstransit plante Wiederaufnahme mit Kafka und Hangfire funktioniert nicht
Post
by Anonymous » 18 Aug 2025, 21:48
Ich versuche, die geplante Neulieferung in Masstransit bei der Verwendung von Kafka als Transport zu konfigurieren. Redelivery mit Hangfire , wie in der Dokumentation vorgeschlagen. /> Ich habe die Anweisungen befolgt und verwendet useLayedErivery mit uswendeduledReDeLivery .
Code: Select all
services.AddMassTransit(configurator =>
{
configurator.AddPublishMessageScheduler();
configurator.AddHangfireConsumers();
configurator.SetKebabCaseEndpointNameFormatter();
configurator.UsingInMemory((context, cfg) =>
{
cfg.UsePublishMessageScheduler();
cfg.ConfigureEndpoints(context);
});
configurator.AddRider(rider =>
{
rider.AddConsumer();
rider.UsingKafka((context, factoryConfigurator) =>
{
var kafkaSettings = context.GetRequiredService().Value;
factoryConfigurator.Host(kafkaSettings.Host);
factoryConfigurator.TopicEndpoint(
kafkaSettings.ReputationTopic,
kafkaSettings.ReputationConsumerGroup,
cfg =>
{
cfg.ConfigureConsumer(context);
cfg.UseScheduledRedelivery(r => r.Intervals(ScheduledRedeliveryIntervals));
cfg.UseMessageRetry(r => r.Intervals(ImmediateRetryIntervals));
cfg.UseInMemoryOutbox(context);
});
});
});
});
< /code>
services.AddHangfire(x => x.UsePostgreSqlStorage(options =>
{
var connectionString = configuration.GetConnectionString(postgresSqlConnectionName);
options.UseNpgsqlConnection(connectionString);
})
.SetDataCompatibilityLevel(CompatibilityLevel.Version_180)
.UseSimpleAssemblyNameTypeSerializer()
.UseRecommendedSerializerSettings()
.UseSerilogLogProvider()
.UseFilter(new AutomaticRetryAttribute
{
Attempts = 10,
//5sec, 10 sec, 15sec, 30sec, 1min, 5min, 10min, 1h, 12h, 24h
DelaysInSeconds = [5, 10, 15, 30, 60, 300, 600, 1800, 43200, 86400]
}));
services.AddHangfireServer();
< /code>
Was passiert: < /h2>
[*] Sofortige Wiederholungen funktionieren wie erwartet. < /li>
Scheduled redelivery
erstellt einen Hangfire Job (ich kann ihn im Hangfire Dashboard sehen). Dashboard: < /p>
Code: Select all
// Id: #324
using MassTransit.HangfireIntegration;
var scheduleJob = Activate();
await scheduleJob.SendMessage(
FromJson("{
\"HashId\":\"f053000015a25d25b0ec08dddd701bff\",
\"DestinationAddress\":\"loopback://localhost/kafka/main-events-topic\",
\"ContentType\":\"application/vnd.masstransit+json\",
\"ResponseAddress\":\"\",
\"FaultAddress\":\"\",
\"Body\":\"{\r\n \\\"messageId\\\": \\\"f0530000-15a2-5d25-4ce1-08dddd701c25\\\",\r\n \\\"requestId\\\": null,\r\n \\\"correlationId\\\": \\\"ca6d9695-f3e2-4f32-934b-24c4af52feb0\\\",\r\n \\\"conversationId\\\": \\\"44550000-5d25-0015-e878-08dddd6fc501\\\",\r\n \\\"initiatorId\\\": null,\r\n \\\"sourceAddress\\\": \\\"loopback://localhost/\\\",\r\n \\\"destinationAddress\\\": null,\r\n \\\"responseAddress\\\": null,\r\n \\\"faultAddress\\\": null,\r\n \\\"messageType\\\": [\r\n \\\"urn:message:UserService.Domain.Events:BaseEvent\\\"\r\n ],\r\n \\\"message\\\": {\r\n \\\"eventId\\\": \\\"ca6d9695-f3e2-4f32-934b-24c4af52feb0\\\",\r\n \\\"eventType\\\": \\\"QuestionUpvote\\\",\r\n \\\"userId\\\": 12\r\n },\r\n \\\"expirationTime\\\": null,\r\n \\\"sentTime\\\": \\\"2025-08-17T09:26:13.2771324Z\\\",\r\n \\\"headers\\\": {\r\n \\\"MessageId\\\": \\\"44550000-5d25-0015-e240-08dddd6fc501\\\",\r\n \\\"CorrelationId\\\": \\\"ca6d9695-f3e2-4f32-934b-24c4af52feb0\\\",\r\n \\\"ConversationId\\\": \\\"44550000-5d25-0015-e878-08dddd6fc501\\\",\r\n \\\"DestinationAddress\\\": \\\"loopback://localhost/kafka/main-events-topic\\\",\r\n \\\"SourceAddress\\\": \\\"loopback://localhost/\\\",\r\n \\\"MT-Activity-Id\\\": \\\"00-84da22b3a6b6023ddd446a02d845717d-2063eaf79320b845-01\\\",\r\n \\\"MT-OriginalMessageId\\\": \\\"44550000-5d25-0015-e240-08dddd6fc501\\\",\r\n \\\"MT-Redelivery-Count\\\": 1\r\n },\r\n \\\"host\\\": {\r\n \\\"machineName\\\": \\\"DESKTOP-P6ELB1O\\\",\r\n \\\"processName\\\": \\\"UserService.Api\\\",\r\n \\\"processId\\\": 21488,\r\n \\\"assembly\\\": \\\"UserService.Api\\\",\r\n \\\"assemblyVersion\\\": \\\"1.0.0.0\\\",\r\n \\\"frameworkVersion\\\": \\\"9.0.8\\\",\r\n \\\"massTransitVersion\\\": \\\"8.5.1.0\\\",\r\n \\\"operatingSystemVersion\\\": \\\"Microsoft Windows NT 10.0.26100.0\\\"\r\n }\r\n}\",
\"MessageId\":\"f0530000-15a2-5d25-80bf-08dddd701c0e\",
\"CorrelationId\":\"ca6d9695-f3e2-4f32-934b-24c4af52feb0\",
\"ConversationId\":\"44550000-5d25-0015-e878-08dddd6fc501\",
\"TokenId\":\"f0530000-15a2-5d25-b0ec-08dddd701bff\",
\"HeadersAsJson\":\"{\r\n \\\"MessageId\\\": \\\"44550000-5d25-0015-e240-08dddd6fc501\\\",\r\n \\\"CorrelationId\\\": \\\"ca6d9695-f3e2-4f32-934b-24c4af52feb0\\\",\r\n \\\"ConversationId\\\": \\\"44550000-5d25-0015-e878-08dddd6fc501\\\",\r\n \\\"DestinationAddress\\\": \\\"loopback://localhost/kafka/main-events-topic\\\",\r\n \\\"SourceAddress\\\": \\\"loopback://localhost/\\\",\r\n \\\"MT-Activity-Id\\\": \\\"00-84da22b3a6b6023ddd446a02d845717d-2063eaf79320b845-01\\\",\r\n \\\"MT-OriginalMessageId\\\": \\\"44550000-5d25-0015-e240-08dddd6fc501\\\",\r\n \\\"MT-Redelivery-Count\\\": 1\r\n}\",
\"MessageType\":\"urn:message:UserService.Domain.Events:BaseEvent\",
\"Destination\":\"loopback://localhost/kafka/main-events-topic\"
}"),
null);
Meine Fragen:
Muss ich die Verwendung von UsePublishMessagesschen () Beide in Verwendung vonInmemory und in der Verwendung von KAFKA
als Hangfire
1755546539
Anonymous
Ich versuche, die geplante Neulieferung in Masstransit bei der Verwendung von Kafka als Transport zu konfigurieren. Redelivery mit Hangfire , wie in der Dokumentation vorgeschlagen. /> Ich habe die Anweisungen befolgt und verwendet useLayedErivery mit uswendeduledReDeLivery .[code]services.AddMassTransit(configurator => { configurator.AddPublishMessageScheduler(); configurator.AddHangfireConsumers(); configurator.SetKebabCaseEndpointNameFormatter(); configurator.UsingInMemory((context, cfg) => { cfg.UsePublishMessageScheduler(); cfg.ConfigureEndpoints(context); }); configurator.AddRider(rider => { rider.AddConsumer(); rider.UsingKafka((context, factoryConfigurator) => { var kafkaSettings = context.GetRequiredService().Value; factoryConfigurator.Host(kafkaSettings.Host); factoryConfigurator.TopicEndpoint( kafkaSettings.ReputationTopic, kafkaSettings.ReputationConsumerGroup, cfg => { cfg.ConfigureConsumer(context); cfg.UseScheduledRedelivery(r => r.Intervals(ScheduledRedeliveryIntervals)); cfg.UseMessageRetry(r => r.Intervals(ImmediateRetryIntervals)); cfg.UseInMemoryOutbox(context); }); }); }); }); < /code> services.AddHangfire(x => x.UsePostgreSqlStorage(options => { var connectionString = configuration.GetConnectionString(postgresSqlConnectionName); options.UseNpgsqlConnection(connectionString); }) .SetDataCompatibilityLevel(CompatibilityLevel.Version_180) .UseSimpleAssemblyNameTypeSerializer() .UseRecommendedSerializerSettings() .UseSerilogLogProvider() .UseFilter(new AutomaticRetryAttribute { Attempts = 10, //5sec, 10 sec, 15sec, 30sec, 1min, 5min, 10min, 1h, 12h, 24h DelaysInSeconds = [5, 10, 15, 30, 60, 300, 600, 1800, 43200, 86400] })); services.AddHangfireServer(); < /code> Was passiert: < /h2> [*] Sofortige Wiederholungen funktionieren wie erwartet. < /li> Scheduled redelivery[/code] erstellt einen Hangfire Job (ich kann ihn im Hangfire Dashboard sehen). Dashboard: < /p> [code]// Id: #324 using MassTransit.HangfireIntegration; var scheduleJob = Activate(); await scheduleJob.SendMessage( FromJson("{ \"HashId\":\"f053000015a25d25b0ec08dddd701bff\", \"DestinationAddress\":\"loopback://localhost/kafka/main-events-topic\", \"ContentType\":\"application/vnd.masstransit+json\", \"ResponseAddress\":\"\", \"FaultAddress\":\"\", \"Body\":\"{\r\n \\\"messageId\\\": \\\"f0530000-15a2-5d25-4ce1-08dddd701c25\\\",\r\n \\\"requestId\\\": null,\r\n \\\"correlationId\\\": \\\"ca6d9695-f3e2-4f32-934b-24c4af52feb0\\\",\r\n \\\"conversationId\\\": \\\"44550000-5d25-0015-e878-08dddd6fc501\\\",\r\n \\\"initiatorId\\\": null,\r\n \\\"sourceAddress\\\": \\\"loopback://localhost/\\\",\r\n \\\"destinationAddress\\\": null,\r\n \\\"responseAddress\\\": null,\r\n \\\"faultAddress\\\": null,\r\n \\\"messageType\\\": [\r\n \\\"urn:message:UserService.Domain.Events:BaseEvent\\\"\r\n ],\r\n \\\"message\\\": {\r\n \\\"eventId\\\": \\\"ca6d9695-f3e2-4f32-934b-24c4af52feb0\\\",\r\n \\\"eventType\\\": \\\"QuestionUpvote\\\",\r\n \\\"userId\\\": 12\r\n },\r\n \\\"expirationTime\\\": null,\r\n \\\"sentTime\\\": \\\"2025-08-17T09:26:13.2771324Z\\\",\r\n \\\"headers\\\": {\r\n \\\"MessageId\\\": \\\"44550000-5d25-0015-e240-08dddd6fc501\\\",\r\n \\\"CorrelationId\\\": \\\"ca6d9695-f3e2-4f32-934b-24c4af52feb0\\\",\r\n \\\"ConversationId\\\": \\\"44550000-5d25-0015-e878-08dddd6fc501\\\",\r\n \\\"DestinationAddress\\\": \\\"loopback://localhost/kafka/main-events-topic\\\",\r\n \\\"SourceAddress\\\": \\\"loopback://localhost/\\\",\r\n \\\"MT-Activity-Id\\\": \\\"00-84da22b3a6b6023ddd446a02d845717d-2063eaf79320b845-01\\\",\r\n \\\"MT-OriginalMessageId\\\": \\\"44550000-5d25-0015-e240-08dddd6fc501\\\",\r\n \\\"MT-Redelivery-Count\\\": 1\r\n },\r\n \\\"host\\\": {\r\n \\\"machineName\\\": \\\"DESKTOP-P6ELB1O\\\",\r\n \\\"processName\\\": \\\"UserService.Api\\\",\r\n \\\"processId\\\": 21488,\r\n \\\"assembly\\\": \\\"UserService.Api\\\",\r\n \\\"assemblyVersion\\\": \\\"1.0.0.0\\\",\r\n \\\"frameworkVersion\\\": \\\"9.0.8\\\",\r\n \\\"massTransitVersion\\\": \\\"8.5.1.0\\\",\r\n \\\"operatingSystemVersion\\\": \\\"Microsoft Windows NT 10.0.26100.0\\\"\r\n }\r\n}\", \"MessageId\":\"f0530000-15a2-5d25-80bf-08dddd701c0e\", \"CorrelationId\":\"ca6d9695-f3e2-4f32-934b-24c4af52feb0\", \"ConversationId\":\"44550000-5d25-0015-e878-08dddd6fc501\", \"TokenId\":\"f0530000-15a2-5d25-b0ec-08dddd701bff\", \"HeadersAsJson\":\"{\r\n \\\"MessageId\\\": \\\"44550000-5d25-0015-e240-08dddd6fc501\\\",\r\n \\\"CorrelationId\\\": \\\"ca6d9695-f3e2-4f32-934b-24c4af52feb0\\\",\r\n \\\"ConversationId\\\": \\\"44550000-5d25-0015-e878-08dddd6fc501\\\",\r\n \\\"DestinationAddress\\\": \\\"loopback://localhost/kafka/main-events-topic\\\",\r\n \\\"SourceAddress\\\": \\\"loopback://localhost/\\\",\r\n \\\"MT-Activity-Id\\\": \\\"00-84da22b3a6b6023ddd446a02d845717d-2063eaf79320b845-01\\\",\r\n \\\"MT-OriginalMessageId\\\": \\\"44550000-5d25-0015-e240-08dddd6fc501\\\",\r\n \\\"MT-Redelivery-Count\\\": 1\r\n}\", \"MessageType\":\"urn:message:UserService.Domain.Events:BaseEvent\", \"Destination\":\"loopback://localhost/kafka/main-events-topic\" }"), null); [/code] Meine Fragen: Muss ich die Verwendung von UsePublishMessagesschen () Beide in Verwendung vonInmemory und in der Verwendung von KAFKA als Hangfire