1

Алгоритм считывает данные из очереди Kafka с использованием KafkaSpout.Потребление данных из очереди Kafka с использованием топологии Storm

я столкнулся следующее исключение:

Exception in thread "main" java.lang.IllegalStateException: Couldn't initialize the topology 
    at com.bridgera.iot.kafka.App.main(App.java:63) 
Caused by: java.lang.IllegalArgumentException: Storm conf is not valid. Must be json-serializable 
    at backtype.storm.StormSubmitter.submitTopology(StormSubmitter.java:104) 
    at backtype.storm.StormSubmitter.submitTopology(StormSubmitter.java:70) 
    at com.bridgera.iot.kafka.App.main(App.java:60) 

Мои Java-код:

public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException 
{ 
    String nimbusHost = "localhost"; 
    ZkHosts zkHosts=new ZkHosts("localhost:2181"); 
    String topic_name="test"; 
    String consumer_group_id="storm"; 
    String zookeeper_root=""; 
    SpoutConfig kafkaConfig=new SpoutConfig(zkHosts, 
      topic_name, zookeeper_root, consumer_group_id); 
    kafkaConfig.scheme=new SchemeAsMultiScheme(new StringScheme()); 
    KafkaSpout kafkaSpout = new KafkaSpout(kafkaConfig); 
    TopologyBuilder builder=new TopologyBuilder(); 
    //builder.setSpout("KafkaSpout", kafkaSpout, 1); 
    builder.setSpout("KafkaSpout", kafkaSpout); 
    builder.setBolt("PrinterBolt", new PrinterBolt()).globalGrouping("KafkaSpout"); 
    Map<String, Object> conf = new HashMap<String, Object>(); 
    conf.put(Config.TRANSACTIONAL_ZOOKEEPER_PORT, 2181); 
    conf.put(Config.TRANSACTIONAL_ZOOKEEPER_SERVERS, Arrays.asList("localhost")); 
    conf.put(Config.STORM_ZOOKEEPER_SESSION_TIMEOUT, 20000); 
    conf.put(Config.STORM_ZOOKEEPER_CONNECTION_TIMEOUT, 20000); 
    conf.put(Config.STORM_ZOOKEEPER_RETRY_TIMES, 3); 
    conf.put(Config.STORM_ZOOKEEPER_RETRY_INTERVAL, 30); 
    LocalCluster cluster=new LocalCluster(); 
    try{ 
     cluster.submitTopology("KafkaConsumerTopology", conf, builder.createTopology()); 
     Thread.sleep(120000); 
    }catch (Exception e) { 
     throw new IllegalStateException("Couldn't initialize the topology", e); 
    } 
} 

Позвольте мне знать, что я здесь делаю неправильно в конфигурации. FYI: Я запускаю Zookeeper и Storm JVM (локальное исполнение) в кластере AWS.

+0

Я не смог воспроизвести ошибку. Кстати: ваш код трассировки стека не соответствует: в вашем коде вы используете 'LocalCluster', но ваша трассировка стека говорит' StormSubmitter'. В этом случае это не удается? Вы пытались начать с пустой конфигурации и добавлять параметры один за другим, чтобы увидеть, какой из них дает проблему? –

+0

@Sax я так не пробовал, я попробую сейчас. Я просто запускаю этот файл класса в кластере amazon aws. –

ответ

0

Попробуйте это:

List<String> hosts = Arrays.asList("localhost") 
conf.put(Config.TRANSACTIONAL_ZOOKEEPER_SERVERS, hosts); 

Я думаю, что проблема с java.util.AbstractList<Object> вернулся из Arrays.asList("localhost") и синтаксического анализа в формате JSON.

+0

Нет такой же ошибки. –