Send Saga ist im IOC nicht mehr verfügbarC#

Ein Treffpunkt für C#-Programmierer
Guest
 Send Saga ist im IOC nicht mehr verfügbar

Post by Guest »

Ich habe ein Problem mit Masstransit und der Saga-Funktion. überschreiben ">

Code: Select all

Error: MassTransit.ReceiveTransport[0]
R-FAULT rabbitmq://localhost/monitoring-job-saga 489a0000-4100-0250-1852-08dd500596e7 Shared.MessagingContracts.JobSubmitted MainApp.MonitoringJobState(00:00:00.0334249)
System.NotSupportedException: Send saga is no longer available in IoC
at MassTransit.Configuration.RegistrationServiceCollectionExtensions.TempSagaRepository`1.Send[T](ConsumeContext`1 context, ISagaPolicy`2 policy, IPipe`1 next) in /_/src/MassTransit/DependencyInjection/Configuration/RegistrationServiceCollectionExtensions.cs:Line 93
at MassTransit.Middleware.CorrelatedSagaFilter`2.Send(ConsumeContext`1 context, IPipe`1 next)
< /code>
services.AddMassTransit(x =>
{
x.AddSagaStateMachine()
.InMemoryRepository();

x.UsingRabbitMq((context, cfg) =>
{
cfg.Host(new Uri(rabbitHost), h =>
{
h.Username(rabbitUser);
h.Password(rabbitPass);
});

cfg.UseDelayedMessageScheduler();

cfg.ReceiveEndpoint(sagaQueue, e =>
{
e.StateMachineSaga(
context.GetRequiredService(),
context.GetRequiredService());
});
});
});
< /code>
Überwachungjobstatemachine-Klasse: < /p>
using MassTransit;
using MainApp.Data;
using Shared.MessagingContracts;

namespace MainApp
{
public class MonitoringJobStateMachine : MassTransitStateMachine
{
public State Submitted { get; private set; } = null!;
public State Processing { get; private set; } = null!;
public State Completed { get; private set; } = null!;
public State Failed { get; private set; } = null!;

public Event JobSubmittedEvent { get; private set; } = default!;
public Event JobCompletedEvent { get; private set; } = default!;
public Event JobFailedEvent { get; private set; } = default!;

public Schedule JobTimeoutSchedule { get; private set; } = default!;

public MonitoringJobStateMachine()
{
InstanceState(x => x.CurrentState);

Event(() => JobSubmittedEvent, x =>
{
x.CorrelateById(ctx => ctx.Message.CorrelationId);
x.InsertOnInitial = true;
});

Event(() => JobCompletedEvent, x =>
{
x.CorrelateById(ctx => ctx.Message.CorrelationId);
});

Event(() => JobFailedEvent, x =>
{
x.CorrelateById(ctx => ctx.Message.CorrelationId);
});

// Konfiguriere den Timeout-Scheduler
Schedule(() => JobTimeoutSchedule, x => x.TimeoutTokenId, s =>
{
s.Delay = TimeSpan.FromSeconds(10);
s.Received = r => r.CorrelateById(ctx => ctx.Message.CorrelationId);
});

// Initialer Übergang: JobSubmitted -> sende ProcessJob und plane Timeout
Initially(
When(JobSubmittedEvent)
.ThenAsync(async context =>
{
// Werte aus der eingehenden Nachricht übernehmen
context.Saga.SubmittedAt = context.Message.Timestamp;
context.Saga.Regions = context.Message.Regions;
context.Saga.CurrentAttempt = 0;
if (context.Saga.Regions != null && context.Saga.Regions.Count > 0)
context.Saga.CurrentRegion = context.Saga.Regions[0];

Console.WriteLine($"[Saga] Job {context.Saga.CorrelationId} submitted. Starting in Region {context.Saga.CurrentRegion}.");

// Sende den ProcessJob-Befehl an die regionalspezifische Queue (z. B.  "jobs-de")
await context.Publish(new ProcessJobCommand
{
CorrelationId = context.Saga.CorrelationId,
Region = context.Saga.CurrentRegion,
Attempt = context.Saga.CurrentAttempt
}, publishContext =>
{
// Setzt explizit die Zieladresse für die Nachricht:
publishContext.DestinationAddress = new Uri($"queue:jobs-{context.Saga.CurrentRegion.ToLower()}");
});
})
// Plane den Timeout direkt in der Kette
.Schedule(JobTimeoutSchedule,
ctx => new JobTimeoutMessage { CorrelationId = ctx.Saga.CorrelationId },
ctx => TimeSpan.FromSeconds(10))
.TransitionTo(Processing)
);

During(Processing,
When(JobCompletedEvent)
.Then(ctx =>
{
Console.WriteLine($"[Saga] Job {ctx.Saga.CorrelationId} completed successfully in region {ctx.Message.Region}.");
})
.Unschedule(JobTimeoutSchedule)
.TransitionTo(Completed),

When(JobFailedEvent)
.ThenAsync(async ctx =>
{
Console.WriteLine($"[Saga] Job {ctx.Saga.CorrelationId} failed in region {ctx.Message.Region}. Error: {ctx.Message.ErrorMessage}");
// Erhöhe den Versuchszähler
ctx.Saga.CurrentAttempt++;
if (ctx.Saga.Regions != null && ctx.Saga.CurrentAttempt < ctx.Saga.Regions.Count)
{
// Setze die neue Region
ctx.Saga.CurrentRegion = ctx.Saga.Regions[ctx.Saga.CurrentAttempt];
Console.WriteLine($"[Saga] Fallback: Retrying job {ctx.Saga.CorrelationId} in region {ctx.Saga.CurrentRegion}.");

// Sende den neuen ProcessJob-Befehl an die neue regionale Queue
await ctx.Publish(new ProcessJobCommand
{
CorrelationId = ctx.Saga.CorrelationId,
Region = ctx.Saga.CurrentRegion,
Attempt = ctx.Saga.CurrentAttempt
}, publishContext =>
{
publishContext.DestinationAddress = new Uri($"queue:jobs-{ctx.Saga.CurrentRegion.ToLower()}");
});
}
})
// Plane nach einem Fehlschlag (bzw.  nach Timeout) einen neuen Timeout
.Schedule(JobTimeoutSchedule,
ctx => new JobTimeoutMessage { CorrelationId = ctx.Saga.CorrelationId },
ctx => TimeSpan.FromSeconds(10))
.IfElse(ctx => ctx.Saga.Regions != null && ctx.Saga.CurrentAttempt < ctx.Saga.Regions.Count,
binder => binder.TransitionTo(Processing),
binder => binder.TransitionTo(Failed)
),

When(JobTimeoutSchedule.Received)
.ThenAsync(async ctx =>
{
Console.WriteLine($"[Saga] Timeout in region {ctx.Saga.CurrentRegion} for job {ctx.Saga.CorrelationId}.");
// Bei Timeout wird der Versuch ebenfalls erhöht und ein Fallback eingeleitet
ctx.Saga.CurrentAttempt++;
if (ctx.Saga.Regions != null && ctx.Saga.CurrentAttempt < ctx.Saga.Regions.Count)
{
ctx.Saga.CurrentRegion = ctx.Saga.Regions[ctx.Saga.CurrentAttempt];
Console.WriteLine($"[Saga] Fallback after timeout: Retrying job {ctx.Saga.CorrelationId} in region {ctx.Saga.CurrentRegion}.");
await ctx.Publish(new ProcessJobCommand
{
CorrelationId = ctx.Saga.CorrelationId,
Region = ctx.Saga.CurrentRegion,
Attempt = ctx.Saga.CurrentAttempt
}, publishContext =>
{
publishContext.DestinationAddress = new Uri($"queue:jobs-{ctx.Saga.CurrentRegion.ToLower()}");
});
}
})
.Schedule(JobTimeoutSchedule,
ctx => new JobTimeoutMessage { CorrelationId = ctx.Saga.CorrelationId },
ctx => TimeSpan.FromSeconds(10))
);
}
}
}

Quick Reply

Change Text Case: 
   
  • Similar Topics
    Replies
    Views
    Last post