2013-06-27 3 views
5

Я использую KafkaSpout. Пожалуйста, найдите тестовую программу ниже.Kafka Storm Интеграция с использованием Kafka Spout

Я использую Storm 0.8.1. Класс Multischeme существует в Storm 0.8.2. Я буду использовать это. Я просто хочу знать, как более ранние версии работали, создавая экземпляр класса StringScheme()? Где можно скачать более ранние версии Kafka Spout? Но я сомневаюсь, что это была бы правильная альтернатива, чем работа над Storm 0.8.2. ??? (Confused)

Когда я запускаю код (приведенный ниже) на кластере шторма (т.е. когда я нажимаю свою топологию), я получаю следующую ошибку (это происходит, когда часть Scheme комментируется иначе, конечно, я получу ошибку компилятора как класс не существует в 0.8.1):

java.lang.NoClassDefFoundError: backtype/storm/spout/MultiScheme 
     at storm.kafka.TestTopology.main(TestTopology.java:37) 
Caused by: java.lang.ClassNotFoundException: backtype.storm.spout.MultiScheme 

в коде, приведенном ниже, Вы можете найти spoutConfig.scheme = новый StringScheme(); часть прокомментировала. Я получал ошибку компилятора, если я не комментирую эту строку, но это естественно, потому что там нет конструкторов. Также, когда я создаю экземпляр MultiScheme, я получаю ошибку, так как у меня нет этого класса в 0.8.1.

public class TestTopology { 
    public static class PrinterBolt extends BaseBasicBolt { 
     public void declareOutputFields(OutputFieldsDeclarer declarer) { 
     } 

     public void execute(Tuple tuple, BasicOutputCollector collector) { 
      System.out.println(tuple.toString()); 
     } 
    } 

    public static void main(String [] args) throws Exception { 
     List<HostPort> hosts = new ArrayList<HostPort>(); 
     hosts.add(new HostPort("127.0.0.1",9092)); 
     LocalCluster cluster = new LocalCluster(); 
     TopologyBuilder builder = new TopologyBuilder(); 
     SpoutConfig spoutConfig = new SpoutConfig(new KafkaConfig.StaticHosts(hosts, 1), "test", "/zkRootStorm", "STORM-ID"); 
     spoutConfig.zkServers=ImmutableList.of("localhost"); 
     spoutConfig.zkPort=2181; 
     //spoutConfig.scheme=new StringScheme(); 
     spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme()); 
     builder.setSpout("spout",new KafkaSpout(spoutConfig)); 
     builder.setBolt("printer", new PrinterBolt()) 
       .shuffleGrouping("spout"); 
     Config config = new Config(); 

     cluster.submitTopology("kafka-test", config, builder.createTopology()); 

     Thread.sleep(600000); 
    } 
+0

Я думаю, Я не понимаю проблему: это просто работает, если вы перейдете к 0.8.2? Если это так, зачем даже пытаться запустить в 0.8.1: 0.8.2 заменяет его некоторыми исправлениями ошибок и другими улучшениями. –

ответ

8

У меня была та же проблема. Наконец, разрешил его, и я поставил полный пример работы на github.

Вы можете проверить его здесь> https://github.com/buildlackey/cep

(нажмите на каталог штормовых + Кафка для примера программы, которая должна получить вас и работает).

+8

Подумайте о том, чтобы добавить предложение или два к вашему ответу, чтобы описать, что вы сделали, чтобы ваш ответ был релевантным, не полагаясь на то, что этот репозиторий Git активен. – neontapir

+0

Уверенный: проект содержит модульные тесты и примеры программ, которые иллюстрируют, как разрабатывать приложения обработки сложных событий (CEP) поверх Storm, Kafka и Esper. –

+0

звучит хорошо для меня –

5

У нас была аналогичная проблема.

Наше решение:

  1. Открыть pom.xml

  2. Изменение область от предоставлена ​​<scope>compile</scope>

Если вы хотите узнать больше о прицелы зависимостей проверить Maven доку: Maven docu - dependency scopes

 Смежные вопросы

  • Нет связанных вопросов^_^