Я использую KafkaSpout. Пожалуйста, найдите тестовую программу ниже.Kafka Storm Интеграция с использованием Kafka Spout
Я использую Storm 0.8.1. Класс Multischeme существует в Storm 0.8.2. Я буду использовать это. Я просто хочу знать, как более ранние версии работали, создавая экземпляр класса StringScheme()? Где можно скачать более ранние версии Kafka Spout? Но я сомневаюсь, что это была бы правильная альтернатива, чем работа над Storm 0.8.2. ??? (Confused)
Когда я запускаю код (приведенный ниже) на кластере шторма (т.е. когда я нажимаю свою топологию), я получаю следующую ошибку (это происходит, когда часть Scheme комментируется иначе, конечно, я получу ошибку компилятора как класс не существует в 0.8.1):
java.lang.NoClassDefFoundError: backtype/storm/spout/MultiScheme
at storm.kafka.TestTopology.main(TestTopology.java:37)
Caused by: java.lang.ClassNotFoundException: backtype.storm.spout.MultiScheme
в коде, приведенном ниже, Вы можете найти spoutConfig.scheme = новый StringScheme(); часть прокомментировала. Я получал ошибку компилятора, если я не комментирую эту строку, но это естественно, потому что там нет конструкторов. Также, когда я создаю экземпляр MultiScheme, я получаю ошибку, так как у меня нет этого класса в 0.8.1.
public class TestTopology {
public static class PrinterBolt extends BaseBasicBolt {
public void declareOutputFields(OutputFieldsDeclarer declarer) {
}
public void execute(Tuple tuple, BasicOutputCollector collector) {
System.out.println(tuple.toString());
}
}
public static void main(String [] args) throws Exception {
List<HostPort> hosts = new ArrayList<HostPort>();
hosts.add(new HostPort("127.0.0.1",9092));
LocalCluster cluster = new LocalCluster();
TopologyBuilder builder = new TopologyBuilder();
SpoutConfig spoutConfig = new SpoutConfig(new KafkaConfig.StaticHosts(hosts, 1), "test", "/zkRootStorm", "STORM-ID");
spoutConfig.zkServers=ImmutableList.of("localhost");
spoutConfig.zkPort=2181;
//spoutConfig.scheme=new StringScheme();
spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
builder.setSpout("spout",new KafkaSpout(spoutConfig));
builder.setBolt("printer", new PrinterBolt())
.shuffleGrouping("spout");
Config config = new Config();
cluster.submitTopology("kafka-test", config, builder.createTopology());
Thread.sleep(600000);
}
Я думаю, Я не понимаю проблему: это просто работает, если вы перейдете к 0.8.2? Если это так, зачем даже пытаться запустить в 0.8.1: 0.8.2 заменяет его некоторыми исправлениями ошибок и другими улучшениями. –