Я новичок в Scala и Kafka, и у меня возникли проблемы.Kafka производитель создает тему, но не умеет отправлять сообщения
Я пытаюсь подключить производитель scala kafka к серверу kafka, который установлен на сервере cloudera express. Я сделал это уже один раз в виртуальных машинах с these instructions и не имел никаких проблем.
Когда я запускаю продюсер, создается желаемая тема, но ни одно сообщение не отправляется, или я так думаю.
Здесь не следует часть кода:
производителя Кафки
import java.util.Properties
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
class KafkaProducerManager {
val props = new Properties()
props.put("bootstrap.servers", KafkaServer.KAFKA_ADDRESS)
props.put("acks", "all")
props.put("retries", "2")
props.put("auto.commit.interval.ms", "1000")
props.put("linger.ms", "1")
props.put("block.on.buffer.full", "true")
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
props.put("auto.create.topics.enable", "true")
val producer = new KafkaProducer[String, String](props)
def startCounter() {
println("Start Producer Counter")
for (i <- 1 to 100) {
producer.send(new ProducerRecord("test-counter", i.toString, "Package " + i))
println("Producer - Send: " + i)
}
println("Closing producer")
producer.close()
}
}
Когда я выполнить метод запуска, я вижу «Producer - Отправить: #», как выход из этого и я не ошибки. Итак, я предполагаю, что этот кусок кода отправил сообщения в Кафку.
Я начал следующий на сервере Кафку, прежде чем я побежал производитель:
kafka-console-consumer --zookeeper zk:2181 --topic test-counter
Но вот я вижу, ничего не происходит.
Когда я проверяю тему, которую производитель должен создать, находится в списке.
kafka-topics -zookeeper zk:2181 --list
У меня также есть аналогичная проблема с потребителем:
import java.util.{Arrays, Properties}
import org.apache.kafka.clients.consumer.KafkaConsumer
class KafkaConsumerManager {
val props = new Properties()
props.put("bootstrap.servers", KafkaServer.KAFKA_ADDRESS)
props.put("group.id", "testGroup")
props.put("enable.auto.commit", "true")
props.put("auto.commit.interval.ms", "1000")
props.put("linger.ms", "1")
props.put("session.timeout.ms", "3000")
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
props.put("zookeeper.connect", KafkaServer.ZOOKEEPER_ADDRESS)
val consumer = new KafkaConsumer[String, String](props)
def start() {
println("Start Consumer")
consumer.subscribe(Arrays.asList("test-counter"))
while (true) {
val records = consumer.poll(100)
val iterator = records.iterator()
while (iterator.hasNext) {
val record = iterator.next()
printf("Consumer: offset = %d, key = %s, value = %s \n", record.offset(), record.key(), record.value())
}
}
}
}
При создании сообщения на сервере через Кафка-консольной-производителя я вижу их появляются в Кафка-консоли-потребителей на окружающую сервер, но не в потребитель, которого я написал.
kafka-console-producer --broker-list ks:9092 --topic test-counter
KafkaServer.ZOOKEEPER_ADDRESS такое же, как аргумент гк: 2181 с Кафкой-консольным-потребителем и KafkaServer.KAFKA_ADDRESS таким же, как аргумент кс: 9092 с Кафкой-консольным-производителем.
Я попробую это немедленно –
С этими изменениями проблема все еще существует. Но теперь я могу видеть, что это причина: 'java.net.ConnectException: время соединения завершено: никакой дополнительной информации. По какой-то причине производитель и потребитель не могут подключиться к ks: 9092. –
Я пробовал этот код с другим сервером kafka, и там он отлично работает. Кажется, это проблема с нашим kafkaserver на cloudera и конфигурации. –