2016-12-25 8 views
1

Я пытаюсь использовать kafka's kafka.zk.EmbeddedZookeeper и kafka.server.KafkaServer, возвращенный kafka.utils.TestUtils/createServer, для запуска сервера kafka для тестирования.Неспособность создать встроенный брокер Kafka

Но я попадаю в блокпост, где пытается отправить сообщение, и возвращает ошибку KafkaProducer$Future. Ниже приведена версия кафки. И нижеприведенный код - Clojure, взаимодействующий с библиотекой Kafka.

[org.apache.kafka/kafka_2.11 "0.10.0.1"] 
[org.apache.kafka/kafka-clients "0.10.1.0"] 

Это как далеко.

  • Порт Zookeeper случайным образом назначен (см. here).
  • Может успешно создать Zookeeper server и подключиться к нему, используя netcat.
  • Может успешно создать тему.
  • Может успешно создать брокера Kafka и подключиться к нему, используя netcat.
  • Этап 5, где процесс выходит из строя.

Этот SO question предполагает, что важно передать правильный объект Time. Но MockTime выглядит как разумная реализация. Кто-то занимался этим раньше?

;; 1. Create Zookeeper 
(require '[clojure.test :refer :all] 
    '[kafkaesque.topics :as kt] 
    '[kafkaesque.utils :as ku] 
    '[clojure.pprint :refer [pprint]]) 

(import '[java.nio.file Files] 
    '[kafka.zk EmbeddedZookeeper] 
    '[kafka.server KafkaServer KafkaConfig] 
    '[kafka.utils TestUtils Time MockTime]) 

(def zk-config {:zkhost "127.0.0.1"}) 
(def topic-name "client-test") 
(def ^EmbeddedZookeeper zkServer (EmbeddedZookeeper.)) 


;; 2. Create Kafka Broker 
(def zk-connect-str (str "127.0.0.1" ":" (.port zkServer))) 
(def zku ((ZkUtils/apply (ZkUtils/createZkClient zk-connect-str 10000 8000) false))) 
(def brokerhost "127.0.0.1") 
(def brokerport "9092") 

(def ^KafkaConfig config (KafkaConfig. {"zookeeper.connect" zk-connect-str 
         "broker.id" "0" 
         "log.dirs" (.toString 
          (.toAbsolutePath 
          (Files/createTempDirectory 
          "kafka-" (make-array java.nio.file.attribute.FileAttribute 0)))) 
         "listeners" (str "PLAINTEXT://" brokerhost ":" brokerport)})) 

(def ^Time mock (MockTime.)) 
(def ^KafkaServer kafkaServer (TestUtils/createServer config mock)) 


;; 3. Create a Topic 
(kt/create! zku topic-name 1 1 {}) 
(kt/topic-exists? zku topic-name) ;; returns true 


;; 4. Create a Producer and ProducerRecord 
(def producer-a (kc/producer {"bootstrap.servers" "127.0.0.1:9092" 
       "acks"    "all" 
       "retries"   "0" 
       "batch.size"  "16384" 
       "linger.ms"   "1" 
       "buffer.memory"  "33554432" 
       "key.serializer" "org.apache.kafka.common.serialization.StringSerializer" 
       "value.serializer" "org.apache.kafka.common.serialization.StringSerializer"})) 

(def message-key "k1") 
(def message-value "foobar") 
(def record-a (kc/producer-record topic-name 0 message-key message-value)) 


;; 5. Send a message 
(def send-result (kc/send! producer-a record-a)) ;; Times out, and returns a KafkaProducer$Future failure. 
+0

Нельзя указывать экземпляр Time для TestUtils.createServer, так как он будет создавать по умолчанию. Что касается «тайм-аутов», не могли бы вы подтвердить, что он говорит, что время ожидания метаданных? Кроме того, я замечаю, что версии для клиента и сервера не совпадают. Не могли бы вы использовать Кафку из той же версии, чтобы повторить попытку? – amethystic

+0

@amethystic Crikey, вот и все - несоответствие версии. * org.apache.kafka/kafka_2.11 * ** "0.10.0.1" **, против * org.apache.kafka/kafka-clients * ** "0.10.1.0" **. Боже, я вытаскиваю волосы! Я переместил все версии на «0.10.1.0» **, и все работает. Приветствия :) – Nutritioustim

ответ

0

Спасибо идет к @amethystic за указание на мою дислексии :) Проблема была сбоями. Я использую org.apache.kafka/kafka_2.11"0.10.0.1" с org.apache.kafka/Кафка-клиентов"0.10.1.0".

Я переместил все версии в «0.10.1.0», и все работает так, как ожидалось.

Надеюсь, это поможет.

 Смежные вопросы

  • Нет связанных вопросов^_^