2013-12-20 8 views
6

В последнее время у меня много проблем из-за того, что кажется ошибкой в ​​пакете MassTransit.UnityIntegration, в первую очередь из-за того, что регистрационные имена не рассматриваются.Несколько потребителей за одно и то же сообщение через Unity, не работающее в MassTransit

Например, если зарегистрировать мои классы, как это:

var container = new UnityContainer() 
    .RegisterType<Consumes<Command1>.All, Handler1>("Handler1") 
    .RegisterType<Consumes<Command1>.All, Handler3>("Handler3"); 

несколько строк позже, я использую метод LoadFrom расширения, чтобы получить зарегистрированных потребителей в контейнере, как это:

IServiceBus massTransitBus = ServiceBusFactory.New(_sbc => 
    { 
     _sbc.UseBinarySerializer(); 
     _sbc.UseControlBus(); 
     _sbc.ReceiveFrom("msmq://localhost/MyQueue"); 
     _sbc.UseMsmq(_x => 
      { 
       _x.UseSubscriptionService("msmq://localhost/mt_subscriptions"); 
       _x.VerifyMsmqConfiguration(); 
      }); 
     _sbc.Subscribe(_s => _s.LoadFrom(container)); 
    }); 

Что происходит, так это то, что мои обработчики никогда не вызываются, когда связанные сообщения попадают в шину.

После размышления на некоторое время, я решил взглянуть на реализацию, и стало ясно, почему это происходит:

Это основной код внутри метода LoadFrom:

public static void LoadFrom(this SubscriptionBusServiceConfigurator configurator, IUnityContainer container) 
{ 
    IList<Type> concreteTypes = FindTypes<IConsumer>(container, x => !x.Implements<ISaga>()); 
    if (concreteTypes.Count > 0) 
    { 
     var consumerConfigurator = new UnityConsumerFactoryConfigurator(configurator, container); 

     foreach (Type concreteType in concreteTypes) 
      consumerConfigurator.ConfigureConsumer(concreteType); 
    } 

    ... 

} 

Обратите внимание, что он только находит типы и не передает никакой информации об именах вперед. Это FindTypes<T> реализация:

static IList<Type> FindTypes<T>(IUnityContainer container, Func<Type, bool> filter) 
{ 
    return container.Registrations 
         .Where(r => r.MappedToType.Implements<T>()) 
         .Select(r => r.MappedToType) 
         .Where(filter) 
         .ToList(); 
} 

После нескольких indirections, все сводится к этой единственной линии, внутри UnityConsumerFactory<T> класса, который фактически создает экземпляр потребителя:

var consumer = childContainer.Resolve<T>(); 

Это абсолютно не будет работать с Unity при наличии нескольких регистраций, поскольку единственный способ зарегистрировать (а затем разрешить) несколько реализаций в Unity - дать им имя в вызове RegisterType, а позже указать это имя в вызове Resolve.

Возможно, мне не хватает чего-то совершенно элементарного во всем этом, и ошибка с моей стороны? Источник для компонентов MassTransit Unity можно найти here. Я не рассматривал код для других контейнеров, потому что я не знаком с ними, но я полагаю, что это было обработано каким-то образом? Я думаю, что более одного потребителя для одного и того же типа сообщений внутри одного и того же контейнера на самом деле довольно распространены.

В этом конкретном случае было бы лучше передать не только Type из регистрации в контейнере, но и имя, используемое для регистрации.

Update

Ну проблема немного более ясно, что Трэвис взял время, чтобы объяснить это. Я должен был заметить это раньше.

Кажется, я должен регистрировать типы непосредственно они должны быть правильно решены внутри завода, как это:

var container = new UnityContainer() 
    .RegisterType<Handler1>() 
    .RegisterType<Handler3>(); 

При таком подходе можно также опустить регистрационное имя, так как теперь их ключи сборки внутри контейнера разные.

Хорошо, это сработало бы отлично, если бы это был наш реальный сценарий, но это не так.Позвольте мне объяснить, что именно мы делаем:

Перед тем как мы начали использовать MassTransit, мы уже имели интерфейс, используемый для шаблона команды, называемый ICommandHandler<TCommand>, где TCommand является базовой моделью для команд в системе. Когда мы начали рассматривать использование служебной шины, с самого начала было ясно, что можно будет переключиться позже на другую шину служебной шины без особых хлопот. Имея это в виду, я приступил к созданию абстракции над нашим командным интерфейсом, чтобы вести себя как один из потребителей, которого ожидает MT. Это то, что я придумал:

public class CommandHandlerToConsumerAdapter<T> : Consumes<T>.All 
    where T : class, ICommand 
{ 
    private readonly ICommandHandler<T> _commandHandler; 

    public CommandHandlerToConsumerAdapter(ICommandHandler<T> commandHandler) 
    { 
     _commandHandler = commandHandler; 
    } 

    public void Consume(T _message) 
    { 
     _commandHandler.Handle(_message); 
    } 
} 

Это очень простой класс адаптера. Он получает реализацию ICommandHandler<T> и заставляет ее вести себя как экземпляр Consumes<T>.All. К сожалению, MT required message models to be classes, так как у нас не было этого ограничения для наших команд, но это было небольшим неудобством, и мы продолжили добавлять ограничение where T : class к нашим интерфейсам.

Затем, поскольку наши интерфейсы обработчиков уже были зарегистрированы в контейнере, было бы необходимо зарегистрировать интерфейс MT с этой реализацией адаптера и позволить контейнеру вставлять в него реальные реализации. Например, более реалистичный пример (взятый прямо из нашей базы коды):

.RegisterType<ICommandHandler<ApplicationInstallationCommand>, CommandRecorder>("Recorder") 
.RegisterType<ICommandHandler<ApplicationInstallationCommand>, InstallOperation>("Executor") 
.RegisterType<Consumes<ApplicationInstallationResult>.All, CommandHandlerToConsumerAdapter<ApplicationInstallationResult>>() 
.RegisterType<Consumes<ApplicationInstallationCommand>.All, CommandHandlerToConsumerAdapter<ApplicationInstallationCommand>> 
    ("Recorder", new InjectionConstructor(new ResolvedParameter<ICommandHandler<ApplicationInstallationCommand>>("Recorder"))) 
.RegisterType<Consumes<ApplicationInstallationCommand>.All, CommandHandlerToConsumerAdapter<ApplicationInstallationCommand>> 
    ("Executor", new InjectionConstructor(new ResolvedParameter<ICommandHandler<ApplicationInstallationCommand>>("Executor"))) 

Названных регистрации есть немного запутанные, но требуется, так как мы теперь имеем два потребителя для того же сообщения. Хотя это и не так чисто, как мы надеялись, мы могли бы жить с этим, так как это способствует огромному развязыванию нашего кода от конкретной логики MassTransit: класс адаптера находится в отдельной сборке, на которую обращается ТОЛЬКО конечный уровень в системе, для целей регистрации контейнера , Это кажется очень хорошей идеей, но теперь подтверждается неподдерживаемой логикой поиска классов интеграции контейнеров.

Обратите внимание, что я не могу зарегистрировать конкретные классы здесь, так как в середине есть общий класс адаптера.

Update 2:

После следуя совету Трэвиса, я попробовал этот простой код, который также не работает (я не понимаю, почему, как это кажется вполне допустимо). Это явная регистрация потребителя завод без автоматической интеграции контейнера:

_sbc.Consume(() => container.resolve<Consumes<ApplicationInstallationCommand>.All>("Recorder")) 

Это решительность вызов правильно дает мне ранее зарегистрированный CommandHandlerToConsumerAdapter<ApplicationInstallationCommand> экземпляр, который реализует Consumes<ApplicationInstallationCommand>.All, который, в свою очередь, должен быть один из базовых интерфейсов, поддерживаемых. Публикация ApplicationInstallationCommand сразу после этого ничего не делает, как если бы обработчик был недействителен или что-то подобное.

Это работает, хотя:

_sbc.Consume(() => (CommandHandlerToConsumerAdapter<ApplicationInstallationCommand>) container.resolve<Consumes<ApplicationInstallationCommand>.All>("Recorder")) 

Очевидно, что-то глубоко в API обрабатывает тип компиляции в не обобщенным образом своего рода, а не основываясь на общем интерфейсе.

Я имею в виду ... он работает с этим, но регистрационный код становится запутанным без видимой причины (из-за того, что я буду рассматривать как «нестандартные детали реализации» на стороне MT). Может быть, я просто хватаюсь за соломинку здесь? Возможно, все это сводится к «почему MT не признает собственный, уже общий интерфейс?«Зачем нужен конкретный тип во время компиляции, чтобы увидеть, что это обработчик сообщений, даже если экземпляр, который я передаю ему, набирается как Consumes<X>.All, также во время компиляции?

Update 3:

После обсуждения с Трэвисом ниже, я решил полностью отказаться от сборки UnityIntegration и идти с автономными Consumer звонков на подписку.

Я создал небольшой класс расширения в нашей MassTransit конкретной сборке, чтобы облегчить вещи:

public static class CommandHandlerEx 
{ 
    public static CommandHandlerToConsumerAdapter<T> ToConsumer<T>(this ICommandHandler<T> _handler) 
     where T : class, ICommand 
    { 
     return new CommandHandlerToConsumerAdapter<T>(_handler); 
    } 
} 

И наконец зарегистрировал обработчик, как это:

var container = new UnityContainer() 
    .RegisterType<ICommandHandler<ApplicationInstallationCommand>, CommandRecorder>("Recorder") 
    .RegisterType<ICommandHandler<ApplicationInstallationCommand>, InstallOperation>("Executor"); 

IServiceBus massTransitBus = ServiceBusFactory.New(_sbc => 
    { 
     _sbc.UseBinarySerializer(); 
     _sbc.UseControlBus(); 
     _sbc.ReceiveFrom("msmq://localhost/MyQueue"); 
     _sbc.UseMsmq(_x => 
      { 
       _x.UseSubscriptionService("msmq://localhost/mt_subscriptions"); 
       _x.VerifyMsmqConfiguration(); 
      }); 
     _sbc.Subscribe(RegisterConsumers); 
    }); 

private void RegisterConsumers(SubscriptionBusServiceConfigurator _s) 
{ 
    _s.Consumer(() => container.Resolve<ICommandHandler<ApplicationInstallationCommand>>("Recorder").ToConsumer()); 
    _s.Consumer(() => container.Resolve<ICommandHandler<ApplicationInstallationCommand>>("Executor").ToConsumer()); 
} 

После использования в течение всего дня вчера попробуйте разобраться, я настоятельно рекомендую вам держаться подальше от сборщиков контейнеров, если вы хотите, чтобы ожидаемое поведение из контейнера и/или если вы хотите настроить классы и т. д. (например, я сделал, чтобы отделить наши классы обмена сообщениями от MT specific co de) по двум основным причинам:

  1. Логика в расширениях проходит через регистрацию в контейнере, чтобы найти потребительские классы. Это, на мой взгляд, ужасный дизайн. Если что-то хочет реализовать из контейнера, он должен просто позвонить Resolve или ResolveAll на свой интерфейс (или их эквивалент в терминах Unity), не заботясь о том, что именно зарегистрировано и каковы их конкретные типы. Это может иметь серьезные последствия с кодом, который предполагает, что контейнер может возвращать типы, которые не были явно зарегистрированы. К счастью, это не относится к этим классам, но у нас есть расширение контейнера, которое автоматически создает типы декораторов на основе ключа сборки, и они не обязательно должны быть явно зарегистрированы в контейнере.

  2. Потребительская регистрация использует имущество MappedToType на экземпляре ContainerRegistration для звонка Resolve на контейнер. Это абсолютно неверно в любой ситуации, а не только в контексте MassTransit. Типы в Unity регистрируются как отображение (например, в выдержках выше, с компонентом From и To) или непосредственно в виде одного конкретного типа. В случаях BOTH логика должна использовать тип RegisteredType для разрешения из контейнера. Теперь он работает так, что если вы зарегистрируете обработчики с их интерфейсами, MT полностью обходит вашу регистрационную логику и вместо этого разрешит вызов конкретного типа, который works in Unity out of the box, что может привести к непредсказуемому поведению, потому что вы думаете, что это должен быть синглтон например, вы зарегистрировались, но, к примеру, это скорее переходный объект (по умолчанию).

Оглядываясь назад, я вижу, что это было намного сложнее, чем я изначально считал. В процессе тоже было немного обучения, так что это хорошо.

Update 4:

Вчера я решил реорганизовать весь адаптер подойти немного, прежде чем сделать окончательное фиксирование. Я пошел с шаблоном интерфейса MassTransit, чтобы создать свои адаптеры, потому что я думаю, что это очень красивый и чистый синтаксис.

Вот результат:

public sealed class CommandHandlerToConsumerAdapter<T> 
    where T : class, ICommand 
{ 
    public sealed class All : Consumes<T>.All 
    { 
     private readonly ICommandHandler<T> m_commandHandler; 

     public All(ICommandHandler<T> _commandHandler) 
     { 
      m_commandHandler = _commandHandler; 
     } 

     public void Consume(T _message) 
     { 
      m_commandHandler.Handle(_message); 
     } 
    } 
} 

Unfortunatelly это ломает код MassTransit из-за необработанного исключения по методу полезности в ссылочной библиотеки Magnum, на метод расширения под названием ToShortTypeName.

Вот исключение:

ArgumentOutOfRangeException in MassTransit receive

на System.String.Substring (Int32 StartIndex, длина Int32)
в Magnum.Extensions.ExtensionsToType.ToShortTypeName (Тип)
на MassTransit.Pipeline.Sinks.ConsumerMessageSink 2.<>c__DisplayClass1.<Selector>b__0(IConsumeContext 1 контекст) в d: \ BuildAgent-02 \ work \ aa063b4295dfc097 \ src \ MassTransit \ Pipeline \ Sinks \ ConsumerMessageSink.cs: строка 51 на MassTransit.Pipeline.Sinks.InboundConvertMess ageSink`1. <> c__DisplayClass2. <> c__DisplayClass4.b__1 (IConsumeContext х) в D: \ BuildAgent-02 \ работа \ aa063b4295dfc097 \ SRC \ MassTransit \ Трубопроводный \ Раковины \ InboundConvertMessageSink.cs: линия 45 на MassTransit.Context.ServiceBusReceiveContext.DeliverMessageToConsumers (IReceiveContext контексте) в г : \ BuildAgent-02 \ работа \ aa063b4295dfc097 \ SRC \ MassTransit \ Context \ ServiceBusReceiveContext.cs: строка 162

ответ

4

Пока я не знаю, интеграция единства со всеми контейнерами, вы должны зарегистрировать свои потребители, как конкретный тип в контейнере, а не интерфейсы Consumes<>. Я предполагаю, что это всего лишь RegisterType<Handler1, Handler1>(), но я не совсем уверен в этом.

Если вам не нравится расширение для вашего контейнера LoadFrom, вам не нужно его использовать. Вы всегда можете просто разрешить потребителей самостоятельно и зарегистрировать их через _sbc.Consume(() => container.resolve<YourConsumerType>()) в конфигурации вместо этого. Расширение LoadFrom - это просто убеждение для людей, которые используют контейнер обычным способом.

Следующий код работает, который использует контейнер так, как я ожидал бы, не зная вашего домена больше, один для его использования. Если вы хотите понять, как сообщения привязаны немного лучше, я бы предложил использовать RabbitMQ, потому что вы можете легко увидеть, где все закончилось, заменив привязки обмена. На данный момент это намного больше, чем вопрос SO, я бы привел это в список рассылки, если у вас есть что-то еще.

using System; 
using MassTransit; 
using Microsoft.Practices.Unity; 

namespace MT_Unity 
{ 
    class Program 
    { 
     static void Main(string[] args) 
     { 
      using (var container = new UnityContainer() 
       .RegisterType<ICommandHandler<MyCommand>, MyCommandHandler>() 
       .RegisterType<CommandHandlerToConsumerAdapter<MyCommand>>()) 

      using (IServiceBus consumerBus = ServiceBusFactory.New(sbc => 
        { 
         sbc.ReceiveFrom("rabbitmq://localhost/consumer"); 
         sbc.UseRabbitMq(); 


         sbc.Subscribe(s => s.Consumer(() => container.Resolve<CommandHandlerToConsumerAdapter<MyCommand>>())); 
        })) 
      using (IServiceBus publisherBus = ServiceBusFactory.New(sbc => 
        { 
         sbc.ReceiveFrom("rabbitmq://localhost/publisher"); 
         sbc.UseRabbitMq(); 
        })) 
      { 
       publisherBus.Publish(new MyCommand()); 

       Console.ReadKey(); 
      } 
     } 
    } 

    public class CommandHandlerToConsumerAdapter<T> : Consumes<T>.All where T : class, ICommand 
    { 
     private readonly ICommandHandler<T> _commandHandler; 

     public CommandHandlerToConsumerAdapter(ICommandHandler<T> commandHandler) 
     { 
      _commandHandler = commandHandler; 
     } 

     public void Consume(T message) 
     { 
      _commandHandler.Handle(message); 
     } 
    } 

    public interface ICommand { } 
    public class MyCommand : ICommand { } 

    public interface ICommandHandler<T> where T : class, ICommand 
    { 
     void Handle(T message); 
    } 

    public class MyCommandHandler : ICommandHandler<MyCommand> 
    { 
     public MyCommandHandler() 
     { 

     } 
     public void Handle(MyCommand message) 
     { 
      Console.WriteLine("Handled MyCommand"); 
     } 
    } 

} 
+0

Не могли бы вы немного разобраться? Я не понимаю, что вы говорите. То, что я там делаю, отображает Contumes . Весь интерфейс для двух конкретных типов: Handler1 и Handler3. Это очень необычно с единством, может быть, это просто синтаксическая путаница здесь с вашей стороны? Кстати, вы, вероятно, знаете, кто на самом деле закодировал части интеграции единства. Можно ли, возможно, связаться с ними напрямую и/или указать их здесь? Спасибо за быстрый ответ, как всегда, Тревис;) – julealgon

+0

https://github.com/MassTransit/MassTransit/blob/v2.8.0/src/Containers/MassTransit.UnityIntegration/UnityExtensions.cs#L69 Мы ищем типы, которые реализуют «Потребности <> 'не типы' Потребности <> '. Так мы делаем это во всех контейнерах. Поэтому, когда вы регистрируетесь, вместо регистрации в качестве «Потребностей»> «Все» регистрируют его как конкретный тип. – Travis

+0

А теперь я понимаю, что вы имели в виду, зарегистрировав конкретные типы. Что ж, для меня это совершенно бессмысленно. Я не знал, что вы, ребята, использовали тип .MappedTo, чтобы позже разрешить контейнер. Извините, но это кажется совершенно неправильным. Я получил код для этих классов в пятницу и начал изменять источник.Позже сегодня я, вероятно, закончу его тем, что кажется правильным для меня: я передам зарегистрированный тип и имя на завод и разрешу их там. Я отредактирую вопрос, чтобы показать вам, почему ваш подход не работает в моем контексте. – julealgon