Я пытаюсь использовать kafka's kafka.zk.EmbeddedZookeeper
и kafka.server.KafkaServer
, возвращенный kafka.utils.TestUtils/createServer
, для запуска сервера kafka для тестирования.Неспособность создать встроенный брокер Kafka
Но я попадаю в блокпост, где пытается отправить сообщение, и возвращает ошибку KafkaProducer$Future
. Ниже приведена версия кафки. И нижеприведенный код - Clojure, взаимодействующий с библиотекой Kafka.
[org.apache.kafka/kafka_2.11 "0.10.0.1"]
[org.apache.kafka/kafka-clients "0.10.1.0"]
Это как далеко.
- Порт Zookeeper случайным образом назначен (см. here).
- Может успешно создать Zookeeper server и подключиться к нему, используя netcat.
- Может успешно создать тему.
- Может успешно создать брокера Kafka и подключиться к нему, используя netcat.
- Этап 5, где процесс выходит из строя.
Этот SO question предполагает, что важно передать правильный объект Time
. Но MockTime выглядит как разумная реализация. Кто-то занимался этим раньше?
;; 1. Create Zookeeper
(require '[clojure.test :refer :all]
'[kafkaesque.topics :as kt]
'[kafkaesque.utils :as ku]
'[clojure.pprint :refer [pprint]])
(import '[java.nio.file Files]
'[kafka.zk EmbeddedZookeeper]
'[kafka.server KafkaServer KafkaConfig]
'[kafka.utils TestUtils Time MockTime])
(def zk-config {:zkhost "127.0.0.1"})
(def topic-name "client-test")
(def ^EmbeddedZookeeper zkServer (EmbeddedZookeeper.))
;; 2. Create Kafka Broker
(def zk-connect-str (str "127.0.0.1" ":" (.port zkServer)))
(def zku ((ZkUtils/apply (ZkUtils/createZkClient zk-connect-str 10000 8000) false)))
(def brokerhost "127.0.0.1")
(def brokerport "9092")
(def ^KafkaConfig config (KafkaConfig. {"zookeeper.connect" zk-connect-str
"broker.id" "0"
"log.dirs" (.toString
(.toAbsolutePath
(Files/createTempDirectory
"kafka-" (make-array java.nio.file.attribute.FileAttribute 0))))
"listeners" (str "PLAINTEXT://" brokerhost ":" brokerport)}))
(def ^Time mock (MockTime.))
(def ^KafkaServer kafkaServer (TestUtils/createServer config mock))
;; 3. Create a Topic
(kt/create! zku topic-name 1 1 {})
(kt/topic-exists? zku topic-name) ;; returns true
;; 4. Create a Producer and ProducerRecord
(def producer-a (kc/producer {"bootstrap.servers" "127.0.0.1:9092"
"acks" "all"
"retries" "0"
"batch.size" "16384"
"linger.ms" "1"
"buffer.memory" "33554432"
"key.serializer" "org.apache.kafka.common.serialization.StringSerializer"
"value.serializer" "org.apache.kafka.common.serialization.StringSerializer"}))
(def message-key "k1")
(def message-value "foobar")
(def record-a (kc/producer-record topic-name 0 message-key message-value))
;; 5. Send a message
(def send-result (kc/send! producer-a record-a)) ;; Times out, and returns a KafkaProducer$Future failure.
Нельзя указывать экземпляр Time для TestUtils.createServer, так как он будет создавать по умолчанию. Что касается «тайм-аутов», не могли бы вы подтвердить, что он говорит, что время ожидания метаданных? Кроме того, я замечаю, что версии для клиента и сервера не совпадают. Не могли бы вы использовать Кафку из той же версии, чтобы повторить попытку? – amethystic
@amethystic Crikey, вот и все - несоответствие версии. * org.apache.kafka/kafka_2.11 * ** "0.10.0.1" **, против * org.apache.kafka/kafka-clients * ** "0.10.1.0" **. Боже, я вытаскиваю волосы! Я переместил все версии на «0.10.1.0» **, и все работает. Приветствия :) – Nutritioustim