2015-06-24 2 views
1

Предположим, у меня есть команда для редактирования одной записи статьи, называемой ArticleEditCommand.CQRS, несколько узлов записи для одной записи агрегата, при сохранении параллелизма

  • Пользователь 1 выдает ArticleEditCommand на основании V1 статьи.
  • Пользователь 2 выпускает ArticleEditCommand на основе V1 такой же
    артикул.

Если я могу гарантировать, что мои узлы обрабатывать старые ArticleEditCommand команд первыми, я могу быть уверен, что команда от пользователя 2 будет не потому, что команда пользователя 1 будет изменила версию статьи к V2.

Однако, если у меня есть два узла, обрабатывайте сообщения ArticleEditCommand одновременно, даже если команды будут выполняться в очереди в правильном порядке, я не могу гарантировать, что узлы будут обрабатывать первую команду перед второй командой из-за всплеск в CPU или что-то подобное. Я мог бы использовать транзакцию sql для обновления статьи, где version = expectedVersion, и учитывать количество изменений записей, но мои правила более сложны и не могут жить исключительно в SQL. Я бы хотел, чтобы вся моя логика обработки команд была параллельной между ArticleEditCommand сообщениями, которые изменяют эту же статью.

Я не хочу блокировать очередь во время обработки команды, потому что точка с несколькими обработчиками команд - это одновременное управление командами для масштабируемости. При этом я не против, чтобы эти команды обрабатывались последовательно, но только для одного экземпляра/идентификатора статьи. Я не ожидаю большого объема сообщений ArticleEditCommand для отправки по одной статье.

С учетом сказанного, вот вопрос.

Есть ли способ обрабатывать команды последовательно по нескольким узлам для одного уникального объекта (запись базы данных), но одновременно обрабатывать все другие команды (отдельные записи базы данных)?

Или это проблема, которую я создал из-за недостаточного понимания CQRS и параллелизма?

Является ли это проблемой, которую обычно решали брокеры сообщений? Например, Windows Service Bus, MSMQ/NServiceBus и т. Д.?

EDIT: Я думаю, что я знаю, как справиться с этим сейчас. Когда пользователь 2 выдает ArticleEditCommand, исключение должно быть выбрано для пользователя, позволяющего им знать, что текущая ожидающая операция над этой статьей должна быть завершена до этого момента, чтобы поставить в очередь ArticleEditCommand. Таким образом, в очереди нет двух сообщений ArticleEditCommand, которые влияют на одну и ту же статью.

ответ

0

Прежде всего позвольте мне сказать, если вы не ожидаете большого объема сообщений ArticleEditCommand, это звучит как преждевременная оптимизация.

В других решениях эта проблема обычно не решена брокерами сообщений, а оптимистичная блокировка, обеспечиваемая реализацией сохранения. Я не понимаю, почему простое поле version для оптимистичной блокировки, которое может быть тривиально обработано SQL, противоречит сложной бизнес-логике/обновлениям, может быть, вы могли бы разработать больше?

+0

Ваше право. Мне было любопытно больше всего. Я уверен, что кто-то наткнулся на него, и мне было интересно, как с ним справиться. –

0

Это на самом деле довольно просто, и я сделал это.В основном, это выглядит следующим образом (псевдокод)

//message handler 
ModelTools.TryUpdateEntity(
()=>{ 
     var entity= _repo.Get(myId); 
     entity.Do(whateverCommand); 
     _repo.Save(entity); 
     } 
10); //retry 10 times until giving up 

//repository 
long? _version; 
public MyObject Get(Guid id) 
{ 
    //query data and version 
    _version=data.version; 
    return data.ToMyObject(); 
    } 

public void Save(MyObject data) 
{ 
    //update row in db where version=_version.Value 

    if (rowsUpdated==0) 
    { 
      //things have changed since we've retrieved the object 
     throw new NewerVersionExistsException(); 
    } 
} 

ModelTools.TryUpdateEntity и NewerVersionExistsException являются частью моей CavemanTools цели библиотеки родовой (доступной на NuGet).

Идея состоит в том, чтобы попытаться сделать что-то в обычном режиме, а затем, если изменилась версия объекта (rowversion/timestamp в sql), мы снова повторим всю операцию после ожидания пары миллисекунд. И это именно то, что делает метод TryUpdateEntity(). И вы можете настроить, сколько стоит ждать между попытками или сколько раз он должен повторять операцию.

Если вам нужно уведомить пользователя, то забудьте про повторную попытку, просто поймайте исключение напрямую, а затем сообщите об этом пользователю, чтобы обновить или что-то еще.

0

Перегородка решения на основе

Достичь узел липкость путем маршрутизации входящей команды на основе ID объекта (например. ArticleID по модулю ваше-число-узлов), чтобы убедиться командами user1 и user2 заканчивается на том же узле, а затем обрабатывать команды последовательно. Вы можете обрабатывать все команды по одному или если вы хотите распараллеливать выполнение, разделяйте команды на что-то вроде ID, нечетное/четное, по странам или тому подобное.

Сетка на основе решения

Используйте в памяти сетки (например, Hazelcast или когерентность) и использовать распределенную палачу Service (http://docs.hazelcast.org/docs/2.0/manual/html/ch09.html#DistributedExecution) или аналогичную координировать обработку команд в кластере.

Независимо - перед тем, как добавить эту сложность, вы должны, конечно, спросить себя, действительно ли проблема, если команда User2 будет принята, а User1 получит ошибку параллелизма. Пока изменения User1 не теряются и могут быть повторно применены после обновления статьи, это может быть совершенно нормально.