Алгоритм считывает данные из очереди Kafka с использованием KafkaSpout
.Потребление данных из очереди Kafka с использованием топологии Storm
я столкнулся следующее исключение:
Exception in thread "main" java.lang.IllegalStateException: Couldn't initialize the topology
at com.bridgera.iot.kafka.App.main(App.java:63)
Caused by: java.lang.IllegalArgumentException: Storm conf is not valid. Must be json-serializable
at backtype.storm.StormSubmitter.submitTopology(StormSubmitter.java:104)
at backtype.storm.StormSubmitter.submitTopology(StormSubmitter.java:70)
at com.bridgera.iot.kafka.App.main(App.java:60)
Мои Java-код:
public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException
{
String nimbusHost = "localhost";
ZkHosts zkHosts=new ZkHosts("localhost:2181");
String topic_name="test";
String consumer_group_id="storm";
String zookeeper_root="";
SpoutConfig kafkaConfig=new SpoutConfig(zkHosts,
topic_name, zookeeper_root, consumer_group_id);
kafkaConfig.scheme=new SchemeAsMultiScheme(new StringScheme());
KafkaSpout kafkaSpout = new KafkaSpout(kafkaConfig);
TopologyBuilder builder=new TopologyBuilder();
//builder.setSpout("KafkaSpout", kafkaSpout, 1);
builder.setSpout("KafkaSpout", kafkaSpout);
builder.setBolt("PrinterBolt", new PrinterBolt()).globalGrouping("KafkaSpout");
Map<String, Object> conf = new HashMap<String, Object>();
conf.put(Config.TRANSACTIONAL_ZOOKEEPER_PORT, 2181);
conf.put(Config.TRANSACTIONAL_ZOOKEEPER_SERVERS, Arrays.asList("localhost"));
conf.put(Config.STORM_ZOOKEEPER_SESSION_TIMEOUT, 20000);
conf.put(Config.STORM_ZOOKEEPER_CONNECTION_TIMEOUT, 20000);
conf.put(Config.STORM_ZOOKEEPER_RETRY_TIMES, 3);
conf.put(Config.STORM_ZOOKEEPER_RETRY_INTERVAL, 30);
LocalCluster cluster=new LocalCluster();
try{
cluster.submitTopology("KafkaConsumerTopology", conf, builder.createTopology());
Thread.sleep(120000);
}catch (Exception e) {
throw new IllegalStateException("Couldn't initialize the topology", e);
}
}
Позвольте мне знать, что я здесь делаю неправильно в конфигурации. FYI: Я запускаю Zookeeper и Storm JVM (локальное исполнение) в кластере AWS.
Я не смог воспроизвести ошибку. Кстати: ваш код трассировки стека не соответствует: в вашем коде вы используете 'LocalCluster', но ваша трассировка стека говорит' StormSubmitter'. В этом случае это не удается? Вы пытались начать с пустой конфигурации и добавлять параметры один за другим, чтобы увидеть, какой из них дает проблему? –
@Sax я так не пробовал, я попробую сейчас. Я просто запускаю этот файл класса в кластере amazon aws. –