2016-07-06 5 views
0

То, что я пытаюсь достичь, - это мой веб-сайт, который поднимает сообщение и помещает его в автобус, служба выбирает его и записывает в базу данных с аудитом, который автоматически заполняет поле AddBy/UpdateBy строки.NServiceBus и NHibernate EventListeners, работающие на разных потоках

Я делаю это с помощью компонента NServiceBus IMessageMutator, который записывает идентификатор пользователя в заголовки сообщений из Thread.CurrentPrincipal, который поступает от зарегистрированного пользователя в моем приложении ASP.Net. В моем сервисе я использую IMessageModule, чтобы извлечь этот заголовок и привязать его к Thread.CurrentPrincipal. Это отлично работает, и во время моего обработчика сообщений я вижу Thread.CurrentPrincipal.Identity.Name правильно привязан к идентификатору пользователя, который поднял сообщение в веб-приложении.

Использование IPreUpdateEventListener/IPreInsertEventListener из NHibernate Я устанавливаю AddBy/UpdateBy каждого объекта перед его записью в БД. Это работает на веб-сайте отлично, но в моем сервисе NServiceBus поток, выполняемый слушателем, отличается от потока, над которым работал обработчик, что означает, что CurrentPrincipal потока больше не является идентификатором, связанным в моем IMessageModule.

Я вижу, что NHibernate использует DistributedTransactionFactory в стеке вызовов, который, как я подозреваю, является причиной моей проблемы. Я не хочу потерять транзакцию, так что, если коммит не удался, сообщение не перепробовано или не помещено в очередь ошибок, и если удаление сообщения из очереди завершается с ошибкой, и обновление не возвращается к БД.

Я просмотрел веб-страницы, и все примеры используют CurrentPrincipal потока для привязки идентификатора пользователя, который изменил строки. То, что я ищу, - это способ либо сохранить слушателя NHibernate в том же потоке, что и обработчик сообщения, либо передать идентификатор пользователя слушателю, чтобы он мог привязываться к сущности до того, как он будет записан в БД.

Вот мой слушатель, я опустил метод Set нашел в нем

public class EntityPersistenceListener : IPreUpdateEventListener, IPreInsertEventListener 
{ 
    public bool OnPreUpdate(PreUpdateEvent @event) 
    { 
     var audit = @event.Entity as EntityBase; 
     if (audit == null) 
      return false; 

     var time = DateTimeFactory.GetDateTime(); 

     var name = Thread.CurrentPrincipal.Identity.Name; 

     Set(@event.Persister, @event.State, "AddedDate", audit.AddedDate); 
     Set(@event.Persister, @event.State, "AddedBy", audit.AddedBy); 
     Set(@event.Persister, @event.State, "UpdatedDate", time); 
     Set(@event.Persister, @event.State, "UpdatedBy", name); 

     audit.AddedDate = audit.AddedDate; 
     audit.AddedBy = audit.AddedBy; 
     audit.UpdatedDate= time; 
     audit.UpdatedBy = name; 

     return false;    
    } 
} 

А вот модуль NServiceBus сообщений, который извлекает идентификатор и связывает его с идентичностью текущего потока

public class TenantAndInstanceInfoExtractor : IMessageModule 
{ 
    private readonly IBus _bus; 

    public TenantAndInstanceInfoExtractor(IBus bus) 
    { 
     _bus = bus; 
    } 

    public void HandleBeginMessage() 
    { 
     var headers = _bus.CurrentMessageContext.Headers; 

     if (headers.ContainsKey("TriggeredById")) 
      Thread.CurrentPrincipal = new GenericPrincipal(new GenericIdentity(headers["TriggeredById"]), null); 
     else 
      Thread.CurrentPrincipal = new GenericPrincipal(new GenericIdentity(string.Empty), null); 
    } 

    public void HandleEndMessage() 
    { 

    } 

    public void HandleError() { } 
} 
+1

Morgan. У меня возникли проблемы с полным пониманием вашего требования. У вас есть время для вызова Skype на следующей неделе? Я «simon.cropp» на skype – Simon

+0

Привет, Саймон, да, пожалуйста. Я пришлю вам письмо, чтобы договориться о времени. – Morgan

ответ

0

Спасибо, Саймон, за вашу помощь. После интенсивного рассмотрения моей проблемы и обсуждения того, как NServiceBus работает внутри, я понял ваше понимание и внедрил модуль работы для NServiceBus.

Что происходит, мы полагались на транзакцию, созданную для каждого сообщения, для фиксации нашей сессии NHibernate в БД. Это происходит на контроллере распределенных транзакций (в частности, здесь происходит NHibernate.Transaction.AdoNetWithDistributedTransactionFactory.DistributedTransactionContext), который использует пул потоков.

Используя интерфейс IManageUnitsOfWork от NServiceBus, я смог явно передать транзакцию в том же потоке, что и обработчик сообщений, как показано ниже в примере кода.

В качестве побочного примечания для будущих читателей лучшим решением здесь является не использование Thread.CurrentPrincipal, так как это решение выходит из строя в многопоточных средах, как это имеет место для меня.

public class HiJumpNserviceBusUnitOfWork : IManageUnitsOfWork 
{ 
    private readonly IUnitOfWork _uow; 

    public HiJumpNserviceBusUnitOfWork(IUnitOfWork uow) 
    { 
     _uow = uow; 
    } 

    public void Begin() 
    { 
     _uow.ClearCache(); 
     _uow.BeginTransaction(); 
    } 

    public void End(Exception ex = null) 
    { 
     if (ex != null) 
     { 
      _uow.ClearCache(); 
     } 
     else 
     { 
      _uow.CommitTransaction(); 
     } 
    } 
}