Masstransit plante Wiederaufnahme mit Kafka und Hangfire funktioniert nichtC#

Ein Treffpunkt für C#-Programmierer
Anonymous
 Masstransit plante Wiederaufnahme mit Kafka und Hangfire funktioniert nicht

Post by Anonymous »

Ich versuche, die geplante Neulieferung in Masstransit bei der Verwendung von Kafka als Transport zu konfigurieren. Redelivery mit Hangfire , wie in der Dokumentation vorgeschlagen. /> Ich habe die Anweisungen befolgt und verwendet useLayedErivery mit uswendeduledErivery .
Ich habe auch Hangfire wie in der Dokumentation gezeigt.

Code: Select all

services.AddMassTransit(configurator =>
{
configurator.AddPublishMessageScheduler();
configurator.AddHangfireConsumers();

configurator.SetKebabCaseEndpointNameFormatter();

configurator.UsingInMemory((context, cfg) =>
{
cfg.UsePublishMessageScheduler();
cfg.ConfigureEndpoints(context);
});

configurator.AddRider(rider =>
{
rider.AddConsumer();

rider.UsingKafka((context, factoryConfigurator) =>
{
var kafkaSettings = context.GetRequiredService().Value;

factoryConfigurator.Host(kafkaSettings.Host);

factoryConfigurator.TopicEndpoint(
kafkaSettings.ReputationTopic,
kafkaSettings.ReputationConsumerGroup,
cfg =>
{
cfg.ConfigureConsumer(context);

cfg.UseScheduledRedelivery(r => r.Intervals(ScheduledRedeliveryIntervals));
cfg.UseMessageRetry(r => r.Intervals(ImmediateRetryIntervals));

cfg.UseInMemoryOutbox(context);
});
});
});
});
< /code>
services.AddHangfire(x => x.UsePostgreSqlStorage(options =>
{
var connectionString = configuration.GetConnectionString(postgresSqlConnectionName);
options.UseNpgsqlConnection(connectionString);
})
.SetDataCompatibilityLevel(CompatibilityLevel.Version_180)
.UseSimpleAssemblyNameTypeSerializer()
.UseRecommendedSerializerSettings()
.UseSerilogLogProvider()
.UseFilter(new AutomaticRetryAttribute
{
Attempts = 10,
//5sec, 10 sec, 15sec, 30sec, 1min, 5min, 10min, 1h, 12h, 24h
DelaysInSeconds = [5, 10, 15, 30, 60, 300, 600, 1800, 43200, 86400]
}));

services.AddHangfireServer();
< /code>
What happens:

[*]Immediate retries work as expected.
[*]Scheduled redelivery
erstellt einen Hangfire Job (ich kann ihn im Hangfire Dashboard sehen). Dashboard: < /p>

Code: Select all

// Id: #324
using MassTransit.HangfireIntegration;

var scheduleJob = Activate();
await scheduleJob.SendMessage(
FromJson("{
\"HashId\":\"f053000015a25d25b0ec08dddd701bff\",
\"DestinationAddress\":\"loopback://localhost/kafka/main-events-topic\",
\"ContentType\":\"application/vnd.masstransit+json\",
\"ResponseAddress\":\"\",
\"FaultAddress\":\"\",
\"Body\":\"{\r\n  \\\"messageId\\\": \\\"f0530000-15a2-5d25-4ce1-08dddd701c25\\\",\r\n  \\\"requestId\\\": null,\r\n  \\\"correlationId\\\": \\\"ca6d9695-f3e2-4f32-934b-24c4af52feb0\\\",\r\n  \\\"conversationId\\\": \\\"44550000-5d25-0015-e878-08dddd6fc501\\\",\r\n  \\\"initiatorId\\\": null,\r\n  \\\"sourceAddress\\\": \\\"loopback://localhost/\\\",\r\n  \\\"destinationAddress\\\": null,\r\n  \\\"responseAddress\\\": null,\r\n  \\\"faultAddress\\\": null,\r\n  \\\"messageType\\\": [\r\n    \\\"urn:message:UserService.Domain.Events:BaseEvent\\\"\r\n  ],\r\n  \\\"message\\\": {\r\n    \\\"eventId\\\": \\\"ca6d9695-f3e2-4f32-934b-24c4af52feb0\\\",\r\n    \\\"eventType\\\": \\\"QuestionUpvote\\\",\r\n    \\\"userId\\\": 12\r\n  },\r\n  \\\"expirationTime\\\": null,\r\n  \\\"sentTime\\\": \\\"2025-08-17T09:26:13.2771324Z\\\",\r\n  \\\"headers\\\": {\r\n    \\\"MessageId\\\": \\\"44550000-5d25-0015-e240-08dddd6fc501\\\",\r\n    \\\"CorrelationId\\\": \\\"ca6d9695-f3e2-4f32-934b-24c4af52feb0\\\",\r\n    \\\"ConversationId\\\": \\\"44550000-5d25-0015-e878-08dddd6fc501\\\",\r\n    \\\"DestinationAddress\\\": \\\"loopback://localhost/kafka/main-events-topic\\\",\r\n    \\\"SourceAddress\\\": \\\"loopback://localhost/\\\",\r\n    \\\"MT-Activity-Id\\\": \\\"00-84da22b3a6b6023ddd446a02d845717d-2063eaf79320b845-01\\\",\r\n    \\\"MT-OriginalMessageId\\\": \\\"44550000-5d25-0015-e240-08dddd6fc501\\\",\r\n    \\\"MT-Redelivery-Count\\\": 1\r\n  },\r\n  \\\"host\\\": {\r\n    \\\"machineName\\\": \\\"DESKTOP-P6ELB1O\\\",\r\n    \\\"processName\\\": \\\"UserService.Api\\\",\r\n    \\\"processId\\\": 21488,\r\n    \\\"assembly\\\": \\\"UserService.Api\\\",\r\n    \\\"assemblyVersion\\\": \\\"1.0.0.0\\\",\r\n    \\\"frameworkVersion\\\": \\\"9.0.8\\\",\r\n    \\\"massTransitVersion\\\": \\\"8.5.1.0\\\",\r\n    \\\"operatingSystemVersion\\\": \\\"Microsoft Windows NT 10.0.26100.0\\\"\r\n  }\r\n}\",
\"MessageId\":\"f0530000-15a2-5d25-80bf-08dddd701c0e\",
\"CorrelationId\":\"ca6d9695-f3e2-4f32-934b-24c4af52feb0\",
\"ConversationId\":\"44550000-5d25-0015-e878-08dddd6fc501\",
\"TokenId\":\"f0530000-15a2-5d25-b0ec-08dddd701bff\",
\"HeadersAsJson\":\"{\r\n  \\\"MessageId\\\": \\\"44550000-5d25-0015-e240-08dddd6fc501\\\",\r\n  \\\"CorrelationId\\\": \\\"ca6d9695-f3e2-4f32-934b-24c4af52feb0\\\",\r\n  \\\"ConversationId\\\": \\\"44550000-5d25-0015-e878-08dddd6fc501\\\",\r\n  \\\"DestinationAddress\\\": \\\"loopback://localhost/kafka/main-events-topic\\\",\r\n  \\\"SourceAddress\\\": \\\"loopback://localhost/\\\",\r\n  \\\"MT-Activity-Id\\\": \\\"00-84da22b3a6b6023ddd446a02d845717d-2063eaf79320b845-01\\\",\r\n  \\\"MT-OriginalMessageId\\\": \\\"44550000-5d25-0015-e240-08dddd6fc501\\\",\r\n  \\\"MT-Redelivery-Count\\\": 1\r\n}\",
\"MessageType\":\"urn:message:UserService.Domain.Events:BaseEvent\",
\"Destination\":\"loopback://localhost/kafka/main-events-topic\"
}"),
null);
< /code>
My questions:

Do I need to configure UsePublishMessageScheduler()
sowohl bei Verwendung vonInmemory als auch bei Verwendung von kafka ?
Warum funktioniert Hangfire nur, wenn ich den In-Memory-Scheduler konfigurieren kann. Verbraucher wirklich wiederverkeuert und geliefert?

Code: Select all

public class ReputationEventConsumer : IConsumer
{
public Task Consume(ConsumeContext context)
{
// Throw exception on purpose to trigger retries/redelivery
throw new NotImplementedByDesignException();
}
}

Quick Reply

Change Text Case: 
   
  • Similar Topics
    Replies
    Views
    Last post