Я использую последнюю версию Rebus (0.99.35) вместе с SimpleInjector (3.1.2). В моем первом примере проекта я использую SQL Server для транспорта и Sagas.Rebus Saga срабатывает IAmInitiatedBy несколько раз
Проблема заключается в том, что метод Saga Handle(StartTheSagaMessage message)
, реализующий IAmInitiatedBy<StartTheSagaMessage>
, называется 5 раз, и я не могу понять, почему. Кроме того, этот метод публикует сообщение, которое никогда не принимается шиной.
Вот код для конфигурации:
var container = new Container();
var assemblies = AppDomain.CurrentDomain.GetAssemblies()
.Where(i => i.FullName.StartsWith("Messages"));
container.RegisterCollection(typeof(IHandleMessages<>), assemblies);
var bus = Configure.With(new SimpleInjectorContainerAdapter(container))
.Logging(l => l.Trace())
.Transport(t => t.UseSqlServer(connectionstring, "Messages", "consumer"))
.Routing(r => r.TypeBased().MapAssemblyOf<Job>("consumer"))
.Sagas(s => s.StoreInSqlServer(connectionstring, "Sagas", "SagaIndexTable"))
.Options(o =>
{
o.SetNumberOfWorkers(1);
o.SetMaxParallelism(1);
})
.Start();
container.Verify();
bus.Subscribe<Step1FinishedMessage>().Wait();
bus.Subscribe<Step2FinishedMessage>().Wait();
var procId = Guid.NewGuid();
bus.Send(new StartTheSagaMessage() { ProcessId = procId });
И Saga Код:
public class MySaga : Saga<MySagaData>,
IAmInitiatedBy<StartTheSagaMessage>,
IHandleMessages<Step1FinishedMessage>,
IHandleMessages<Step2FinishedMessage>
{
public IBus Bus { get; set; }
protected override void CorrelateMessages(ICorrelationConfig<MySagaData> config)
{
config.Correlate<StartTheSagaMessage>(m => m.ProcessId, s => s.SagaProcessId);
config.Correlate<Step1FinishedMessage>(m => m.ProcessId, s => s.SagaProcessId);
config.Correlate<Step2FinishedMessage>(m => m.ProcessId, s => s.SagaProcessId);
}
public async Task Handle(StartTheSagaMessage message)
{
if (IsNew == false)
return;
Trace.TraceInformation("Mysaga - got StartTheSagaMessage: {0}", message.ProcessId);
//The saga is started - Do some stuff - call webservices (in external handler)
//When this step is finished the external process replies with a "step1FinishedMessage"
this.Data.SagaProcessId = message.ProcessId;
//Fake Step1FinishMessage (should be replied from external handler)
await Bus.Send(new Step1FinishedMessage() { ProcessId = this.Data.SagaProcessId });
}
public async Task Handle(Step1FinishedMessage message)
{
Trace.TraceInformation("Mysaga - got Step1FinishedMessage: {0}", message.ProcessId);
//Sagabehaviour when the Step1 is finished by the external handler
this.Data.Step1Finished = true;
//After dalying 10 seconds - Send a step2finishedmessage
await Bus.Defer(TimeSpan.FromSeconds(10), new Step2FinishedMessage() { ProcessId = this.Data.SagaProcessId });
}
public async Task Handle(Step2FinishedMessage message)
{
await Task.Run(() =>
//return Task.FromResult<void>(() =>
{
Trace.TraceInformation("Mysaga - got Step2FinishedMessage: {0}", message.ProcessId);
//Step2 is handled - finished the saga
this.Data.Step2Finished = true;
this.MarkAsComplete();
});
}
}
Полный пример основан на solution available here.
Что я делаю неправильно?
Спасибо за помощь.
* Решение доступно здесь * использует Autofac? – qujck
Да, да. Исходное решение также использует старые пакеты nuget. – ilcorvo
Да, я вижу, и обновление всех пакетов создает код, который не компилируется. – qujck