Я использую kafka 0.10.1.1 и storm 1.0.2. В документации по бури для интеграции kafka я вижу, что смещения все еще поддерживаются с использованием zookeeper, поскольку мы инициализируем kafka spout с помощью серверов zookeeper. Как я могу загружать носик с помощью серверов kafka. Есть ли для этого пример. Пример из штормовых DocsKafka spout интеграция
BrokerHosts hosts = new ZkHosts(zkConnString);
SpoutConfig spoutConfig = new SpoutConfig(hosts, topicName, "/" + topicName, UUID.randomUUID().toString());
spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
Этот вариант с использованием зоопарка работает отлично и потребляют сообщения. но я не мог видеть группу потребителей или штормовых узлов как потребителей в kafkamanager ui.
Альтернативный подход - это.
KafkaSpoutConfig<String, String> kafkaSpoutConfig = newKafkaSpoutConfig();
KafkaSpout<String, String> spout = new KafkaSpout<>(kafkaSpoutConfig);
private static KafkaSpoutConfig<String, String> newKafkaSpoutConfig() {
Map<String, Object> props = new HashMap<>();
props.put(KafkaSpoutConfig.Consumer.BOOTSTRAP_SERVERS, bootstrapServers);
props.put(KafkaSpoutConfig.Consumer.GROUP_ID, GROUP_ID);
props.put(KafkaSpoutConfig.Consumer.KEY_DESERIALIZER,
"org.apache.kafka.common.serialization.StringDeserializer");
props.put(KafkaSpoutConfig.Consumer.VALUE_DESERIALIZER,
"org.apache.kafka.common.serialization.StringDeserializer");
props.put(KafkaSpoutConfig.Consumer.ENABLE_AUTO_COMMIT, "true");
String[] topics = new String[1];
topics[0] = topicName;
KafkaSpoutStreams kafkaSpoutStreams =
new KafkaSpoutStreamsNamedTopics.Builder(new Fields("message"), topics).build();
KafkaSpoutTuplesBuilder<String, String> tuplesBuilder =
new KafkaSpoutTuplesBuilderNamedTopics.Builder<>(new TuplesBuilder(topicName)).build();
KafkaSpoutConfig<String, String> spoutConf =
new KafkaSpoutConfig.Builder<>(props, kafkaSpoutStreams, tuplesBuilder).build();
return spoutConf;
}
Но это решение показывает CommitFailedException после прочтения нескольких сообщений от kafka.