Я новичок штурмовать я использую RabbitMQ в моем носике, что recieves кортежей из некоторой очереди и есть клиент работает под управлением одной другая машины, которая вставляет кортежи в эту очередь я побежал простой RabbitMQ пример программы, которая работает нормально, но когда я использую его в буре носик он Гест заблокирован наRabbitMQ блокировки при создании нового соединения в топологии штормового
connection = factory.newConnection();
хотя мой RabbitMQ сервер также работает и на той же машине, когда я запустить пример кода, он работает успешно. операторы печати печать на заявлении
System.out.println(" setting host to 192.168.8.218..... ");
ниже мой полный класс носик.
package storm.starter.spout;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.QueueingConsumer;
import java.util.Map;
import java.util.Random;
import java.net.*;
import java.io.*;
import java.lang.Exception;
import java.io.IOException;
public class RabbitmqSpout extends BaseRichSpout {
SpoutOutputCollector _collector;
public final static String QUEUE_NAME = "record";
ConnectionFactory factory;
Connection connection;
Channel channel;
QueueingConsumer consumer;
@Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector)
{
_collector = collector;
System.out.println(" [*] Intilization of spout..... ");
try
{
factory = new ConnectionFactory();
System.out.println(" creating connection factory..... ");
factory.setHost("192.168.8.96");
System.out.println(" setting host to 192.168.8.218..... ");
connection = factory.newConnection();
System.out.println(" creating new connection..... ");
channel = connection.createChannel();
System.out.println(" creating new channel..... ");
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" Declaring queue..... ");
System.out.println(" [*] Waiting for messages. ");
}
catch(Exception exception)
{
System.out.println("Exception occurred. "+exception.getMessage());
}
}
@Override
public void nextTuple()
{
System.out.println("In wait of tuples.... ");
try
{
consumer = new QueueingConsumer(channel);
System.out.println(" trying to consume..... ");
channel.basicConsume(QUEUE_NAME, true, consumer);
while (true)
{
System.out.println(" trying to deliver..... ");
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(" getting string..... ");
System.out.println(" [x] Received '" + message + "'");
System.out.print("emitting Rabbitmq Queue tuple");
_collector.emit(new Values(message));
System.out.print("emitted Rabbitmq Queue tuple");
}
}
catch(IOException io)
{
System.out.println("Exception occurred. ");
}
catch(Exception exception)
{
System.out.println("Exception occurred. ");
}
}
@Override
public void ack(Object id) {
}
@Override
public void fail(Object id)
{
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer)
{
declarer.declare(new Fields("record"));
}
}
Вы можете попробовать ливневый AMQP-носик для подключения к RabbitMQ https://github.com/Xorlev/storm-amqp-spout – Ilion