2014-01-27 1 views
7

У меня есть простой издатель, сделанный в MassTransit. Я отправляю сообщение через интервал и могу получить его от клиента .NET с помощью MassTransit. Но когда я пытаюсь что-то наблюдать с Python, он молчит. Есть ли способ использовать MassTransit с Python или других языков? Примеры были оценены.Comsuming MassTransit от Python или других языков

Издательство:

builder.Register(c => ServiceBusFactory.New(sbc => { 
    sbc.UseRabbitMq(); 
    sbc.UseBsonSerializer(); 
    sbc.UseLog4Net(); 

    sbc.ReceiveFrom("rabbitmq://localhost/masstransit"); 
}); 

.NET клиент:

public void Execute(IJobExecutionContext context) { 
    using (var scope = ServiceLocator.Current.GetInstance<ILifetimeScope>().BeginLifetimeScope()) { 
     var log = scope.Resolve<ILog>(); 
     log.Debug("Sending queue message"); 

     var bus = scope.Resolve<IServiceBus>(); 
     bus.Publish(new SimpleTextMessage{Text = "some text"}); 
    } 
} 

Python клиент:

import pika 
print('Stating consumer') 
connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='localhost')) 
channel = connection.channel() 

channel.queue_declare(queue='python_consumer_1') 

print ' [*] Waiting for messages. To exit press CTRL+C' 

def callback(ch, method, properties, body): 
    print " [x] Received %r" % (body,) 

channel.basic_consume(callback, queue='python_consumer_1') 
channel.start_consuming() 

След от C# приложения:

Configuration Result: 
[Success] Name MyApp 
[Success] ServiceName MyApp 
Topshelf v3.1.122.0, .NET Framework v4.0.30319.34003 
INFO (MassTransit.BusConfigurators.ServiceBusConfiguratorImpl) 209 - MassTransit v2.9.2/v2.9.0.0, .NET Framework v4.0.30319.34003 
DEBUG(MassTransit.Transports.RabbitMq.RabbitMqTransportFactory) 245 - CreatingRabbitMQ connection: rabbitmq://localhost/groups_error 
DEBUG(MassTransit.Transports.RabbitMq.RabbitMqTransportFactory) 246 - Using default configurator for connection: rabbitmq://localhost/groups_error 
DEBUG(MassTransit.Transports.RabbitMq.RabbitMqTransportFactory) 251 - RabbitMQconnection created: localhost:5672// 
DEBUG(MassTransit.Transports.RabbitMq.RabbitMqTransportFactory) 921 - Creating RabbitMQ connection: rabbitmq://localhost/groups 
DEBUG(MassTransit.Transports.RabbitMq.RabbitMqTransportFactory) 922 - Using default configurator for connection: rabbitmq://localhost/groups 
DEBUG(MassTransit.Transports.RabbitMq.RabbitMqTransportFactory) 924 - RabbitMQconnection created: localhost:5672// 
DEBUG(MassTransit.ServiceContainer) 1056 - Starting bus service: MassTransit.Subscriptions.Coordinator.SubscriptionRouterService 
DEBUG(MassTransit.ServiceContainer) 1062 - Starting bus service: MassTransit.Subscriptions.SubscriptionBusService 
DEBUG(MassTransit.Threading.ThreadPoolConsumerPool) 1080 - Starting Consumer Pool for rabbitmq://localhost/groups 
[Topshelf.Quartz] Scheduled Job: DEFAULT.ea637337-950a-4281-99c0-f10b842814c9 
[Topshelf.Quartz] Job Schedule: Trigger 'DEFAULT.8a1d0b7c-d670-440b-974f-31ec8be6f294': triggerClass: 'Quartz.Impl.Triggers.SimpleTriggerImpl calendar: '' misfireInstruction: 0 nextFireTime: 01/28/2014 07:12:35 +00:00 - Next Fire Time (local): 28.01.2014 9:12:35 +02:00 
[Topshelf.Quartz] Scheduler started... 
DEBUG(MassTransit.Transports.RabbitMq.RabbitMqTransportFactory) 1248 - CreatingRabbitMQ connection: rabbitmq://localhost/MyApp.Transit:SimpleTextMessage_error 
DEBUG(MassTransit.Transports.RabbitMq.RabbitMqTransportFactory) 1250 - Using default configurator for connection: rabbitmq://localhost/MyApp.Transit:SimpleTextMessage_error 
DEBUG(Global) 1254 - Sending queue message 
DEBUG(MassTransit.Transports.RabbitMq.RabbitMqTransportFactory) 1254 - RabbitMQconnection created: localhost:5672// 
DEBUG(MassTransit.Transports.RabbitMq.RabbitMqTransportFactory) 1272 - CreatingRabbitMQ connection: rabbitmq://localhost/MyApp.Transit:SimpleTextMessage 
DEBUG(MassTransit.Transports.RabbitMq.RabbitMqTransportFactory) 1273 - Using default configurator for connection: rabbitmq://localhost/MyApp.Transit:SimpleTextMessage 
DEBUG(MassTransit.Transports.RabbitMq.RabbitMqTransportFactory) 1277 - RabbitMQconnection created: localhost:5672// 
DEBUG(MassTransit.Messages) 1439 - SEND:rabbitmq://localhost/MyApp.Transit:SimpleTextMessage:935a0000-5d93-0015-961c-08d0ea0f744a:MyApp.Transit.SimpleTextMessage, MyApp 
DEBUG(MassTransit.Messages) 1441 - SEND:rabbitmq://localhost/MyApp.Transit:SimpleTextMessage:935a0000-5d93-0015-8965-08d0ea0f744b:MyApp.Transit.SimpleTextMessage, MyApp 
The MyApp service is now running, press Control+C to exit. 

DEBUG(Global) 21212 - Sending queue message 
DEBUG(MassTransit.Messages) 21214 - SEND:rabbitmq://localhost/MyApp.Transit:SimpleTextMessage:935a0000-5d93-0015-d4fb-08d0ea0f77c4:MyApp.Transit.SimpleTextMessage, MyApp 
DEBUG(Global) 41213 - Sending queue message 
DEBUG(MassTransit.Messages) 41214 - SEND:rabbitmq://localhost/MyApp.Transit:SimpleTextMessage:935a0000-5d93-0015-077b-08d0ea0f7b40:MyApp.Transit.SimpleTextMessage, MyApp 
DEBUG(Global) 61212 - Sending queue message 
DEBUG(MassTransit.Messages) 61214 - SEND:rabbitmq://localhost/MyApp.Transit:SimpleTextMessage:935a0000-5d93-0015-2bed-08d0ea0f7ebb:MyApp.Transit.SimpleTextMessage, MyApp 
DEBUG(Global) 81213 - Sending queue message 
DEBUG(MassTransit.Messages) 81215 - SEND:rabbitmq://localhost/MyApp.Transit:SimpleTextMessage:935a0000-5d93-0015-5f44-08d0ea0f8236:MyApp.Transit.SimpleTextMessage, MyApp 
DEBUG(Global) 101212 - Sending queue message 
DEBUG(MassTransit.Messages) 101214 - SEND:rabbitmq://localhost/MyApp.Transit:SimpleTextMessage:935a0000-5d93-0015-80f8-08d0ea0f85b1:MyApp.Transit.SimpleTextMessage, MyApp 
DEBUG(Global) 121212 - Sending queue message 
DEBUG(MassTransit.Messages) 121213 - SEND:rabbitmq://localhost/MyApp.Transit:SimpleTextMessage:935a0000-5d93-0015-a971-08d0ea0f892c:MyApp.Transit.SimpleTextMessage, MyApp 
DEBUG(Global) 141212 - Sending queue message 
DEBUG(MassTransit.Messages) 141214 - SEND:rabbitmq://localhost/MyApp.Transit:SimpleTextMessage:935a0000-5d93-0015-d53e-08d0ea0f8ca7:MyApp.Transit.SimpleTextMessage, MyApp 
DEBUG(Global) 161212 - Sending queue message 
DEBUG(MassTransit.Messages) 172109 - SEND:rabbitmq://localhost/MyApp.Transit:SimpleTextMessage:935a0000-5d93-0015-7504-08d0ea0f9208:MyApp.Transit.SimpleTextMessage, MyApp 
DEBUG(Global) 181212 - Sending queue message 
DEBUG(MassTransit.Messages) 193461 - SEND:rabbitmq://localhost/MyApp.Transit:SimpleTextMessage:935a0000-5d93-0015-dd26-08d0ea0f95bf:MyApp.Transit.SimpleTextMessage, MyApp 

ответ

4

Кажется, что самый простой способ - связать очередь python с обменом в управлении RabbitMq. После этого я успешно получил сообщения.

PyhonConsumer теперь выглядит следующим образом:

import pika 

print('Stating consumer') 

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) 
channel = connection.channel() 

channel.queue_declare('python_consumer_1') 

print ' [*] Waiting for messages. To exit press CTRL+C' 

def callback(ch, method, properties, body): 
    print " [x] Received %r" % (body,) 
    ch.basic_ack(delivery_tag = method.delivery_tag) 

channel.queue_bind(queue='python_consumer_1', exchange='MyApp.Transit:SimpleTextMessage') 
channel.basic_consume(callback, queue='python_consumer_1') 
channel.start_consuming() 
+0

Значит, все работает, и ты в порядке? –

+0

Да, мне удалось предоставить PoC. Спасибо, что подали идею. – ikutsin

3

Если вы собираетесь использовать сообщения с другого языка, вам нужно посмотреть, как создаются обмены, когда MassTransit публикует сообщение. Затем вам нужно будет привязать эти обмены к очередям, чтобы сообщения доставлялись вашим подписчикам.

Для вашего кода Python, вам нужно

exchange_bind(".....:SimpleTextMessage", "phython_consumer_1") 

После того, как вы сделали это, сообщения будут доставлены в очередь. Вы используете BSON, почему бы не использовать JSON или что-то, с чем работает python? Честно говоря, я не уверен, поддерживает ли Python BSON или нет, просто пытается предложить другие предложения.

+0

Я получаю сообщение об ошибке: ChannelClosed: (404, "NOT_FOUND - no exchange 'python_consumer_1' в vhost '/'") при попытке вызвать: channel.exchange_bind ('MyApp.Transit: SimpleTextMessage', 'python_consumer_1') перед ни после basic_consume .. Обновит вопрос следом от masstrasit. – ikutsin

+0

Я также удалил BSON serializer и попытался заменить его на JSON, но часть python все еще молчала и не могла получить сообщение. – ikutsin

1

Вы можете проверить существующие обмены RabbitMQ, очередей с помощью rabbitmqctl утилиты администратора, а затем играть от его результатов.