2013-03-28 3 views
1

Я новичок штурмовать я использую RabbitMQ в моем носике, что recieves кортежей из некоторой очереди и есть клиент работает под управлением одной другая машины, которая вставляет кортежи в эту очередь я побежал простой RabbitMQ пример программы, которая работает нормально, но когда я использую его в буре носик он Гест заблокирован наRabbitMQ блокировки при создании нового соединения в топологии штормового

connection = factory.newConnection(); 

хотя мой RabbitMQ сервер также работает и на той же машине, когда я запустить пример кода, он работает успешно. операторы печати печать на заявлении

System.out.println(" setting host to 192.168.8.218..... "); 

ниже мой полный класс носик.

package storm.starter.spout; 

import backtype.storm.spout.SpoutOutputCollector; 
import backtype.storm.task.TopologyContext; 
import backtype.storm.topology.OutputFieldsDeclarer; 
import backtype.storm.topology.base.BaseRichSpout; 
import backtype.storm.tuple.Fields; 
import backtype.storm.tuple.Values; 
import backtype.storm.utils.Utils; 
import com.rabbitmq.client.ConnectionFactory; 
import com.rabbitmq.client.Connection; 
import com.rabbitmq.client.Channel; 
import com.rabbitmq.client.QueueingConsumer; 
import java.util.Map; 
import java.util.Random; 
import java.net.*; 
import java.io.*; 
import java.lang.Exception; 
import java.io.IOException; 

public class RabbitmqSpout extends BaseRichSpout { 
    SpoutOutputCollector _collector; 
    public final static String QUEUE_NAME = "record"; 
    ConnectionFactory factory; 
    Connection connection; 
    Channel channel; 
    QueueingConsumer consumer; 

    @Override 
    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) 
    { 
     _collector = collector; 
     System.out.println(" [*] Intilization of spout..... "); 

     try 
     { 
      factory = new ConnectionFactory(); 
      System.out.println(" creating connection factory..... "); 
      factory.setHost("192.168.8.96"); 
      System.out.println(" setting host to 192.168.8.218..... "); 
      connection = factory.newConnection(); 
      System.out.println(" creating new connection..... "); 
      channel = connection.createChannel(); 
      System.out.println(" creating new channel..... "); 
      channel.queueDeclare(QUEUE_NAME, false, false, false, null); 
      System.out.println(" Declaring queue..... "); 
      System.out.println(" [*] Waiting for messages. "); 
     } 
     catch(Exception exception) 
     { 
      System.out.println("Exception occurred. "+exception.getMessage()); 
     } 

    } 

    @Override 
    public void nextTuple() 
    { 
     System.out.println("In wait of tuples.... "); 
     try 
     { 
      consumer = new QueueingConsumer(channel); 
      System.out.println(" trying to consume..... "); 
      channel.basicConsume(QUEUE_NAME, true, consumer); 

      while (true) 
      { 
       System.out.println(" trying to deliver..... "); 
       QueueingConsumer.Delivery delivery = consumer.nextDelivery(); 
       String message = new String(delivery.getBody()); 
       System.out.println(" getting string..... "); 
       System.out.println(" [x] Received '" + message + "'"); 
       System.out.print("emitting Rabbitmq Queue tuple"); 
       _collector.emit(new Values(message)); 
       System.out.print("emitted Rabbitmq Queue tuple"); 
      } 
     } 

     catch(IOException io) 
     { 
      System.out.println("Exception occurred. "); 
     } 
     catch(Exception exception) 
     { 
      System.out.println("Exception occurred. "); 
     } 


    }   

    @Override 
    public void ack(Object id) { 
    } 

    @Override 
    public void fail(Object id) 
    { 

    } 

    @Override 
    public void declareOutputFields(OutputFieldsDeclarer declarer) 
    { 
     declarer.declare(new Fields("record")); 
    } 

}

+0

Вы можете попробовать ливневый AMQP-носик для подключения к RabbitMQ https://github.com/Xorlev/storm-amqp-spout – Ilion

ответ