2015-07-14 6 views
2

Мы используем RabbitMQ для сообщений о очередности в C# .Net (клиент EasyNetQ).с использованием многопроцессорного метода EasyNetQ для одного потребителя не работает

Я хочу, чтобы одно потребительское приложение (приложение консоли C#) слушало одну очередь и предоставляло несколько обработчиков для каждого типа сообщений.

Я реализовал этот сценарий и мой код здесь:

using (var advancedBus = RabbitHutch.CreateBus("host=localhost;prefetchcount=100") 
            .Advanced) 
{ 
    var queue = advancedBus.QueueDeclare("MyQueue"); 

    advancedBus.Consume(queue, x => x 
       .Add<MessageType1>((message, info) => 
       { 
        Console.WriteLine("MessageType1 Body : " + message.Body.Body); 
       }) 
       .Add<MessageType2>((message, info) => 
       { 
        Console.WriteLine(" MessageType2 Body: " + message.Body.Body); 
       }).ThrowOnNoMatchingHandler = false); 
} 

Моя проблема: Но когда я исполняю этот потребитель не делает ничего. ничего не происходит.

я публиковать сообщения в этой очереди, как это:

using (var advancedBus = RabbitHutch.CreateBus("host=localhost").Advanced) 
{ 
    var queue = advancedBus.QueueDeclare("MyQueue"); 

    if (advancedBus.IsConnected) 
     advancedBus.Publish(Exchange.GetDefault(), queue.Name, false, false, 
      new Message<MessageType1>(change)); 
    else 
     result = false; 
} 

чем проблема.

+1

ли 'advancedBus.IsConnected' провалится? Вы можете вообще подключиться к кролику? –

+0

Спасибо за ваш комментарий, Да. это связано. когда я подписываюсь на конкретный тип сообщения, он отлично работает, но в моем сценарии лука это не работает. – Javad

ответ

2

Ok, после тестирования этого кода, это проблемы:

Прежде всего, вы утилизацию вашего advancedBus сразу после регистрации для потребления. Вы должны помнить, что при вызове IAdvanceBus.Consume, вы регистрируете обратный вызов для каждого сообщения. Если вы удаляете шину сразу после регистрации, ваш делегат не может быть вызван, поскольку соединение уже было закрыто. Таким образом, вы будете удалить using заявление вокруг декларации кролика (не забудьте утилизировать его, когда вы закончите):

var advancedBus = RabbitHutch.CreateBus("host=localhost;prefetchcount=100").Advanced 

Во-вторых, immediate флаг has been deprecated and shouldn't be used, сообщение не кажется, добираться до очереди. Изменить Publish на:

advancedBus.Publish(Exchange.GetDefault(), queue.Name, true, false, 
        new Message<MessageType1>(change)); 

Кроме того, если вы используете это из консольного приложения, не забудьте использовать Console.ReadKey так что ваш основной поток не прекращается.

Вот рабочий пример кода:

static void Main() 
{ 
    var change = new MessageType1(); 
    var advancedBus = RabbitHutch.CreateBus("host=localhost").Advanced; 

    ConsumeMessage(advancedBus); 

    var queue = advancedBus.QueueDeclare("MyQueue"); 
    if (advancedBus.IsConnected) 
    { 
     advancedBus.Publish(Exchange.GetDefault(), queue.Name, true, false, 
      new Message<MessageType1>(change)); 
    } 
    else 
    { 
     Console.WriteLine("Can't connect"); 
    } 

    Console.ReadKey(); 
} 

private static void ConsumeMessage(IAdvancedBus advancedBus) 
{ 
    var queue = advancedBus.QueueDeclare("MyQueue"); 
    advancedBus.Consume(queue, registration => 
    { 
     registration.Add<MessageType1>((message, info) => 
     { 
      Console.WriteLine("Body: {0}", message.Body); 
     }); 
    }); 
} 
+0

Спасибо, уважаемый Юваль, он решил мою проблему. Устранение было моей проблемой. – Javad

+0

@ Javad Вы приветствуете :) –

 Смежные вопросы

  • Нет связанных вопросов^_^