2015-11-07 6 views
40

Джимми Boagard описывает McDonalds сеть быстрого питания here сравнивая его с scatter gather pattern.Как реализовать сагу, используя разброс/Собирают шаблон В MassTransit 3,0

Workflow изображения украдены из выше статьи: enter image description here

первоначального внедрения Мысли:

Чтобы иметь общий интерфейс для всех видов событий, связанных с FoodOrdered, которые получат все продовольственные станции, а затем каждая пищевая станция сможет потреблять/создавать свои соответствующие и опубликуйте общее событие. Пример: картошка фри и булочная с начинкой получает сообщение о заказе Fries. Станция жаркого, потребляющая заказ, объявляет ItemDoneEvent, который слушает сага.

Первоначальные проблемы:

Поскольку Saga не заботится о типе пищи завершено только тот факт, что вся еда будет завершена в этом, казалось бы, быть OK решение. Однако после с предупреждениями here об обмене очередями и замечанием, что Consumer.Conditional filtering has been removed with MassTransit 3.0 Кажется, что в рамках фреймворка говорится: «Плохие вещи (ТМ) произойдут» с этим типом подхода. Но я не уверен, как еще вы это сделаете, не создавая запрос и ответ на сообщение, а также сопоставляя Event для каждого продукта питания на кухне. Пример: FriesOrdered, BurgerOrdered FriesCooked, BurgerCooked. Это было бы очень утомительно, если бы вам приходилось делать это для каждого предмета на кухне?

Учитывая вышеуказанные проблемы - как выглядит хороший пример саги для такого типа рабочего процесса?

+4

Я могу провести качели в течение выходных и поместить образец в репозиторий MT. –

+1

Крис, у тебя когда-нибудь был шанс задуматься? В настоящее время я рассматриваю аналогичную проблему. –

+2

Почему вы не можете хранить список заказанных продуктов внутри экземпляра Saga и удалять элементы из списка или отмечать как «сделанные» в объектах значения списка, когда вы получаете общий «FoodReady» с конкретный 'FoodType' внутри сообщения? Когда вы в конце концов узнаете, что список пуст, вы можете завершить сагу. –

ответ

0

Проблема с отменой завершенных событий в саге заключается в том, что она создает конфликт на общем ресурсе (то есть в состоянии саги).

У Джима есть другое сообщение, которое появилось после того, на которое вы ссылались, который описывает проблему и решение. Конечно, он специально говорит о NServiceBus, но проблема и концепции одинаковы.

https://lostechies.com/jimmybogard/2014/02/27/reducing-nservicebus-saga-load/

Создать внешнее запоминающее устройство. Поместите запись для каждого рабочего элемента. Пусть каждый работник завершает свою работу, пока сага эффективно опроса использует задержку обмена сообщениями, чтобы убедиться, что вся работа выполнена.

Затем вы по-прежнему выполняете сборку рассеяния, но «агрегатор» заменен шаблоном диспетчера процессов, чтобы уменьшить конфликт.

0

У меня возникла аналогичная проблема - вам нужно опубликовать несколько десятков команд (все те же интерфейсы, IMyRequest) и подождать все.

На самом деле моя команда инициирует другую сагу, которая публикует IMyRequestDone в конце обработки без сохранения саги. (Необходимо завершить их через некоторое время.) Поэтому вместо сохранения числа завершенных вложенных саг в родительской саге я просто запрашиваю состояние экземпляров детской саги.

Проверка на каждом MyRequestDone сообщение:

Schedule(() => FailSagaOnRequestsTimeout, x => x.CheckToken, x => 
{ 
    // timeout for all requests 
    x.Delay = TimeSpan.FromMinutes(10); 
    x.Received = e => e.CorrelateById(context => context.Message.CorrelationId); 
}); 


During(Active, 
    When(Xxx) 
     .ThenAsync(async context => 
     { 
      await context.Publish(context => new MyRequestCommand(context.Instance, "foo")); 
      await context.Publish(context => new MyRequestCommand(context.Instance, "bar")); 

      context.Instance.WaitingMyResponsesTimeoutedAt = DateTime.UtcNow + FailSagaOnRequestsTimeout.Delay; 
      context.Instance.WaitingMyResponsesCount = 2; 
     }) 
     .TransitionTo(WaitingMyResponses) 
     .Schedule(FailSagaOnRequestsTimeout, context => new FailSagaCommand(context.Instance)) 
    ); 

During(WaitingMyResponses, 
    When(MyRequestDone) 
     .Then(context => 
     { 
      if (context.Instance.WaitingMyResponsesTimeoutedAt < DateTime.UtcNow) 
       throw new TimeoutException(); 
     }) 
     .If(context => 
     { 
      var db = serviceProvider.GetRequiredService<DbContext>(); 
      var requestsStates = db.MyRequestStates.Where(x => x.ParentSagaId == context.Instance.CorrelationId).Select(x => x.State).ToList(); 
      var allDone = requestsStates.Count == context.Instance.WaitingMyResponsesCount && 
       requestsStates.All(x => x != nameof(MyRequestStateMachine.Processing)); // assume 3 states of request - Processing, Done and Failed 
      return allDone; 
     }, x => x 
      .Unschedule(FailSagaOnRequestsTimeout) 
      .TransitionTo(Active)) 
     ) 
     .Catch<TimeoutException>(x => x.TransitionTo(Failed)) 
); 

During(WaitingMyResponses, 
    When(FailSagaOnRequestsTimeout.Received) 
     .TransitionTo(Failed) 

Периодически проверяйте, что все запросы, сделанные (по "Снижение NServiceBus Saga нагрузкой"):

Schedule(() => CheckAllRequestsDone, x => x.CheckToken, x => 
{ 
    // check interval 
    x.Delay = TimeSpan.FromSeconds(15); 
    x.Received = e => e.CorrelateById(context => context.Message.CorrelationId); 
}); 

During(Active, 
    When(Xxx) 
     .ThenAsync(async context => 
     { 
      await context.Publish(context => new MyRequestCommand(context.Instance, "foo")); 
      await context.Publish(context => new MyRequestCommand(context.Instance, "bar")); 

      context.Instance.WaitingMyResponsesTimeoutedAt = DateTime.UtcNow.AddMinutes(10); 
      context.Instance.WaitingMyResponsesCount = 2; 
     }) 
     .TransitionTo(WaitingMyResponses) 
     .Schedule(CheckAllRequestsDone, context => new CheckAllRequestsDoneCommand(context.Instance)) 
    ); 

During(WaitingMyResponses, 
    When(CheckAllRequestsDone.Recieved) 
     .Then(context => 
     { 
      var db = serviceProvider.GetRequiredService<DbContext>(); 
      var requestsStates = db.MyRequestStates.Where(x => x.ParentSagaId == context.Instance.CorrelationId).Select(x => x.State).ToList(); 
      var allDone = requestsStates.Count == context.Instance.WaitingMyResponsesCount && 
       requestsStates.All(x => x != nameof(MyRequestStateMachine.Processing)); 
      if (!allDone)   
      { 
       if (context.Instance.WaitingMyResponsesTimeoutedAt < DateTime.UtcNow + CheckAllRequestsDone.Delay)    
        throw new TimeoutException(); 
       throw new NotAllDoneException(); 
      } 
     }) 
     .TransitionTo(Active) 
     .Catch<NotAllDoneException>(x => x.Schedule(CheckAllRequestsDone, context => new CheckAllRequestsDoneCommand(context.Instance))) 
     .Catch<TimeoutException>(x => x.TransitionTo(Failed)); 
1

Может ты не "просто" передать объект в очереди, как параметр события? Когда слушатель саги получает событие «завершено заказ», он будет содержать объект, который будет завершен в случае?

Я полагаю, что отправляется в очередь с помощью шаблонного метода, где объект должен реализовать IFoodOrdered

Тогда вы можете на реализацию виртуального метода, что сага может использовать, чтобы сделать «общую» вещь, когда это и вам нужно только выполнить перегрузки для тех специальных предметов, которые требуют чего-то особенного?