Я новичок в kafka и бурю. Я пытался реализовать Java-пример, который объединяет Кафку и шторм. Я нашел пример в Интернете. Я пытаюсь запустить java-программу в Eclipse IDE. Я не использую maven.Интеграция Kafka Storm с java. kafka.api.OffsetRequest.DefaultClientId() Ljava/языки/String; Ошибка
У меня есть storm-kafka-0.10.0.jar
, kafka-0.6.jar
, scala-library-2.10.3.jar
и storm-core-0.10.0.jar
как внешние банки.
Вот мой код Java.
KafkaStormSample.java
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.topology.TopologyBuilder;
import java.util.UUID;
import backtype.storm.spout.SchemeAsMultiScheme;
import storm.kafka.ZkHosts;
import storm.kafka.BrokerHosts;
import storm.kafka.SpoutConfig;
import storm.kafka.KafkaSpout;
import storm.kafka.StringScheme;
public class KafkaStormSample {
public static void main(String[] args) throws Exception{
Config config = new Config();
config.setDebug(true);
config.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1);
String zkConnString = "localhost:2181";
String topic = "my-first-topic";
BrokerHosts hosts = new ZkHosts(zkConnString);
SpoutConfig kafkaSpoutConfig = new SpoutConfig (hosts, topic, "/" + topic,
UUID.randomUUID().toString());
kafkaSpoutConfig.bufferSizeBytes = 1024 * 1024 * 4;
kafkaSpoutConfig.fetchSizeBytes = 1024 * 1024 * 4;
//kafkaSpoutConfig.forceFromStart = true;
kafkaSpoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("kafka-spout", new KafkaSpout(kafkaSpoutConfig));
//builder.setBolt("word-spitter", new SplitBolt()).shuffleGrouping("kafka-spout");
builder.setBolt("word-counter", new CountBolt()).shuffleGrouping("word-spitter");
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("KafkaStormSample", config, builder.createTopology());
Thread.sleep(10000);
cluster.shutdown();
}
}
CountBolt.java
import java.util.Map;
import java.util.HashMap;
import backtype.storm.tuple.Tuple;
import backtype.storm.task.OutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.IRichBolt;
import backtype.storm.task.TopologyContext;
public class CountBolt implements IRichBolt{
Map<String, Integer> counters;
private OutputCollector collector;
@Override
public void prepare(Map stormConf, TopologyContext context,
OutputCollector collector) {
this.counters = new HashMap<String, Integer>();
this.collector = collector;
}
@Override
public void execute(Tuple input) {
String str = input.getString(0);
if(!counters.containsKey(str)){
counters.put(str, 1);
}else {
Integer c = counters.get(str) +1;
counters.put(str, c);
}
collector.ack(input);
}
@Override
public void cleanup() {
for(Map.Entry<String, Integer> entry:counters.entrySet()){
System.out.println(entry.getKey()+" : " + entry.getValue());
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
}
@Override
public Map<String, Object> getComponentConfiguration() {
return null;
}
}
Когда я пытаюсь запустить kafkaStormSample.java я получаю ошибку ниже.
Exception in thread "main" java.lang.NoSuchMethodError: kafka.api.OffsetRequest.DefaultClientId()Ljava/lang/String;
at storm.kafka.KafkaConfig.<init>(KafkaConfig.java:43)
at storm.kafka.SpoutConfig.<init>(SpoutConfig.java:40)
at KafkaStormSample.main(KafkaStormSample.java:23)
Я убедился, что у меня есть все необходимые банки. Но все же я думаю, что мне не хватает кувшина.
Любая помощь будет оценена по достоинству.
Спасибо!