2017-01-28 8 views
0

спотыкаясь на основы с грузчиком и Кафки, не могу получить соединение клиентане может подключиться к Spotify Кафка контейнера, основные проблемы с подключением

, что я сделал до сих пор

1) установленные Докер окна на окна 10. 2) открыть kitematic, и искал kafka, и выбрал spotify kafka (изображение wurstmeister не запустилось).
3) контейнер загорается, и я вижу изображение, работающее в журналах контейнера.
4) IP и порты сообщают Docker порта 9092 - и порт доступа в качестве локального хоста: 32768

докера пс показывает этот 7bf9f9278e64 Spotify/Кафку последний "supervisord -n" 2 часа назад на 57 минут 0.0.0.0:32769 -> 2181/TCP, 0.0.0.0:32768->9092/tcp Кафка

докер-машина активна, не возвращается ни один активный HOST

мой заводной класс (вид вырезать и вставить из примера setsup соединения, как это

class KafkaProducer { 

    String topicName = "wills topic" 
    Producer<String, String> producer  
def init() { 
    Properties props = new Properties() 
    props.put("bootstrap.servers", "192.168.1.89:32768") //Assign localhost id and external port (9092 int) 
    props.put("acks", "all")       //Set acknowledgements for producer requests. 
    props.put("retries", 0)        //If the request fails, the producer can automatically retry, 
    props.put("batch.size", 16384)      //Specify buffer size in config 
    props.put("linger.ms", 1)       //Reduce the no of requests less than 0 
    props.put("buffer.memory", 33554432)    //The buffer.memory controls the total amount of memory available to the producer for buffering. 
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer") 
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer") 
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") 
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") 

    producer = new org.apache.kafka.clients.producer.KafkaProducer<String, String>(props) 
} .... 

, когда я запускаю этот init, я получаю ошибки, говоря, что он не может разрешить соединение, для java.io.IOException: не удается разрешить адрес: 7bf9f9278e64: 9092, который является внутренним портом контейнера. (мой сценарий звонит из моей обычной среды рабочего стола IDE)

kitmatic говорит, что это сопоставление. так почему я не могу подключиться, а затем отправить? Также как я просто загружаю через kitematic, где можно поставить docker-compose.yml, если вы хотите изменить конфигурацию. На самом деле неясно, где это сделать.

18:05:41.022 [main] INFO o.a.k.c.p.ProducerConfig:[.logAll:] > ProducerConfig values: 
    acks = all 
    batch.size = 16384 
    block.on.buffer.full = false 
    bootstrap.servers = [192.168.1.89:32768] 
    buffer.memory = 33554432 
    client.id = 
    compression.type = none 
    connections.max.idle.ms = 540000 
    interceptor.classes = null 
    key.serializer = class org.apache.kafka.common.serialization.StringSerializer 
    linger.ms = 1 
    max.block.ms = 60000 
    max.in.flight.requests.per.connection = 5 
    max.request.size = 1048576 
    metadata.fetch.timeout.ms = 60000 
    metadata.max.age.ms = 300000 
    metric.reporters = [] 
    metrics.num.samples = 2 
    metrics.sample.window.ms = 30000 
    partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner 
    receive.buffer.bytes = 32768 
    reconnect.backoff.ms = 50 
    request.timeout.ms = 30000 
    retries = 0 
    retry.backoff.ms = 100 
    sasl.kerberos.kinit.cmd = /usr/bin/kinit 
    sasl.kerberos.min.time.before.relogin = 60000 
    sasl.kerberos.service.name = null 
    sasl.kerberos.ticket.renew.jitter = 0.05 
    sasl.kerberos.ticket.renew.window.factor = 0.8 
    sasl.mechanism = GSSAPI 
    security.protocol = PLAINTEXT 
    send.buffer.bytes = 131072 
    ssl.cipher.suites = null 
    ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] 
    ssl.endpoint.identification.algorithm = null 
    ssl.key.password = null 
    ssl.keymanager.algorithm = SunX509 
    ssl.keystore.location = null 
    ssl.keystore.password = null 
    ssl.keystore.type = JKS 
    ssl.protocol = TLS 
    ssl.provider = null 
    ssl.secure.random.implementation = null 
    ssl.trustmanager.algorithm = PKIX 
    ssl.truststore.location = null 
    ssl.truststore.password = null 
    ssl.truststore.type = JKS 
    timeout.ms = 30000 
    value.serializer = class org.apache.kafka.common.serialization.StringSerializer 

18:05:41.076 [main] INFO o.a.k.c.p.ProducerConfig:[.logAll:] > ProducerConfig values: 
    acks = all 
    batch.size = 16384 
    block.on.buffer.full = false 
    bootstrap.servers = [192.168.1.89:32768] 
    buffer.memory = 33554432 
    client.id = producer-1 
    compression.type = none 
    connections.max.idle.ms = 540000 
    interceptor.classes = null 
    key.serializer = class org.apache.kafka.common.serialization.StringSerializer 
    linger.ms = 1 
    max.block.ms = 60000 
    max.in.flight.requests.per.connection = 5 
    max.request.size = 1048576 
    metadata.fetch.timeout.ms = 60000 
    metadata.max.age.ms = 300000 
    metric.reporters = [] 
    metrics.num.samples = 2 
    metrics.sample.window.ms = 30000 
    partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner 
    receive.buffer.bytes = 32768 
    reconnect.backoff.ms = 50 
    request.timeout.ms = 30000 
    retries = 0 
    retry.backoff.ms = 100 
    sasl.kerberos.kinit.cmd = /usr/bin/kinit 
    sasl.kerberos.min.time.before.relogin = 60000 
    sasl.kerberos.service.name = null 
    sasl.kerberos.ticket.renew.jitter = 0.05 
    sasl.kerberos.ticket.renew.window.factor = 0.8 
    sasl.mechanism = GSSAPI 
    security.protocol = PLAINTEXT 
    send.buffer.bytes = 131072 
    ssl.cipher.suites = null 
    ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] 
    ssl.endpoint.identification.algorithm = null 
    ssl.key.password = null 
    ssl.keymanager.algorithm = SunX509 
    ssl.keystore.location = null 
    ssl.keystore.password = null 
    ssl.keystore.type = JKS 
    ssl.protocol = TLS 
    ssl.provider = null 
    ssl.secure.random.implementation = null 
    ssl.trustmanager.algorithm = PKIX 
    ssl.truststore.location = null 
    ssl.truststore.password = null 
    ssl.truststore.type = JKS 
    timeout.ms = 30000 
    value.serializer = class org.apache.kafka.common.serialization.StringSerializer 

18:05:41.079 [main] DEBUG o.a.k.c.m.Metrics:[.sensor:] > Added sensor with name bufferpool-wait-time 
18:05:41.083 [main] DEBUG o.a.k.c.m.Metrics:[.sensor:] > Added sensor with name buffer-exhausted-records 
18:05:41.085 [main] DEBUG o.a.k.c.Metadata:[.update:] > Updated cluster metadata version 1 to Cluster(id = null, nodes = [192.168.1.89:32768 (id: -1 rack: null)], partitions = []) 
18:05:41.401 [main] DEBUG o.a.k.c.m.Metrics:[.sensor:] > Added sensor with name connections-closed: 
18:05:41.401 [main] DEBUG o.a.k.c.m.Metrics:[.sensor:] > Added sensor with name connections-created: 
18:05:41.402 [main] DEBUG o.a.k.c.m.Metrics:[.sensor:] > Added sensor with name bytes-sent-received: 
18:05:41.402 [main] DEBUG o.a.k.c.m.Metrics:[.sensor:] > Added sensor with name bytes-sent: 
18:05:41.406 [main] DEBUG o.a.k.c.m.Metrics:[.sensor:] > Added sensor with name bytes-received: 
18:05:41.406 [main] DEBUG o.a.k.c.m.Metrics:[.sensor:] > Added sensor with name select-time: 
18:05:41.407 [main] DEBUG o.a.k.c.m.Metrics:[.sensor:] > Added sensor with name io-time: 
18:05:41.409 [main] DEBUG o.a.k.c.m.Metrics:[.sensor:] > Added sensor with name batch-size 
18:05:41.410 [main] DEBUG o.a.k.c.m.Metrics:[.sensor:] > Added sensor with name compression-rate 
18:05:41.410 [main] DEBUG o.a.k.c.m.Metrics:[.sensor:] > Added sensor with name queue-time 
18:05:41.410 [main] DEBUG o.a.k.c.m.Metrics:[.sensor:] > Added sensor with name request-time 
18:05:41.410 [main] DEBUG o.a.k.c.m.Metrics:[.sensor:] > Added sensor with name produce-throttle-time 
18:05:41.411 [main] DEBUG o.a.k.c.m.Metrics:[.sensor:] > Added sensor with name records-per-request 
18:05:41.412 [main] DEBUG o.a.k.c.m.Metrics:[.sensor:] > Added sensor with name record-retries 
18:05:41.412 [main] DEBUG o.a.k.c.m.Metrics:[.sensor:] > Added sensor with name errors 
18:05:41.412 [main] DEBUG o.a.k.c.m.Metrics:[.sensor:] > Added sensor with name record-size-max 
18:05:41.414 [main] WARN o.a.k.c.p.ProducerConfig:[.logUnused:] > The configuration 'key.deserializer' was supplied but isn't a known config. 
18:05:41.414 [kafka-producer-network-thread | producer-1] DEBUG o.a.k.c.p.i.Sender:[.run:] > Starting Kafka producer I/O thread. 
18:05:41.414 [main] WARN o.a.k.c.p.ProducerConfig:[.logUnused:] > The configuration 'value.deserializer' was supplied but isn't a known config. 
18:05:41.416 [main] INFO o.a.k.c.u.AppInfoParser:[.<init>:] > Kafka version : 0.10.1.1 
18:05:41.416 [main] INFO o.a.k.c.u.AppInfoParser:[.<init>:] > Kafka commitId : f10ef2720b03b247 
18:05:41.417 [main] DEBUG o.a.k.c.p.KafkaProducer:[.<init>:] > Kafka producer started 
18:05:41.430 [kafka-producer-network-thread | producer-1] DEBUG o.a.k.c.NetworkClient:[.maybeUpdate:] > Initialize connection to node -1 for sending metadata request 
18:05:41.430 [kafka-producer-network-thread | producer-1] DEBUG o.a.k.c.NetworkClient:[.initiateConnect:] > Initiating connection to node -1 at 192.168.1.89:32768. 
18:05:41.434 [kafka-producer-network-thread | producer-1] DEBUG o.a.k.c.m.Metrics:[.sensor:] > Added sensor with name node--1.bytes-sent 
18:05:41.434 [kafka-producer-network-thread | producer-1] DEBUG o.a.k.c.m.Metrics:[.sensor:] > Added sensor with name node--1.bytes-received 
18:05:41.435 [kafka-producer-network-thread | producer-1] DEBUG o.a.k.c.m.Metrics:[.sensor:] > Added sensor with name node--1.latency 
18:05:41.435 [kafka-producer-network-thread | producer-1] DEBUG o.a.k.c.n.Selector:[.pollSelectionKeys:] > Created socket with SO_RCVBUF = 32768, SO_SNDBUF = 131072, SO_TIMEOUT = 0 to node -1 
18:05:41.436 [kafka-producer-network-thread | producer-1] DEBUG o.a.k.c.NetworkClient:[.handleConnections:] > Completed connection to node -1 
18:05:41.452 [kafka-producer-network-thread | producer-1] DEBUG o.a.k.c.NetworkClient:[.maybeUpdate:] > Sending metadata request {topics=[wills topic]} to node -1 
18:05:41.476 [kafka-producer-network-thread | producer-1] WARN o.a.k.c.NetworkClient:[.handleResponse:] > Error while fetching metadata with correlation id 0 : {wills topic=INVALID_TOPIC_EXCEPTION} 
18:05:41.477 [kafka-producer-network-thread | producer-1] DEBUG o.a.k.c.Metadata:[.update:] > Updated cluster metadata version 2 to Cluster(id = 8cjV2Ga6RB6bXfeDWWfTKA, nodes = [7bf9f9278e64:9092 (id: 0 rack: null)], partitions = []) 
18:05:41.570 [kafka-producer-network-thread | producer-1] DEBUG o.a.k.c.NetworkClient:[.maybeUpdate:] > Initialize connection to node 0 for sending metadata request 
18:05:41.570 [kafka-producer-network-thread | producer-1] DEBUG o.a.k.c.NetworkClient:[.initiateConnect:] > Initiating connection to node 0 at 7bf9f9278e64:9092. 
18:05:43.826 [kafka-producer-network-thread | producer-1] DEBUG o.a.k.c.NetworkClient:[.initiateConnect:] > Error connecting to node 0 at 7bf9f9278e64:9092: 
java.io.IOException: Can't resolve address: 7bf9f9278e64:9092 
    at org.apache.kafka.common.network.Selector.connect(Selector.java:180) 
    at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:498) 
    at org.apache.kafka.clients.NetworkClient.access$400(NetworkClient.java:48) 
    at org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:645) 
    at org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:552) 
    at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:258) 
    at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:236) 
    at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:135) 
    at java.lang.Thread.run(Thread.java:745) 

ценят помощь, чтобы меня за это первое препятствие

ответ

1

Try установить --env ADVERTISED_HOST=192.168.1.89 и --env ADVERTISED_PORT=32768 при запуске контейнера. Это необходимо, потому что по умолчанию Kafka рекламирует имя локального хоста (которое является именем хоста контейнера, например 7bf9f9278e64), и это недоступно с хоста. Когда вы используете привязку к порту, вам необходимо рекламировать ваш IP-адрес хоста (например, 192.168.1.89) и отображаемый порт (например, 32768).

+0

Возможно, это так. Кроме того, вы можете передать '-p 9092: 9092 -p 2181: 2181', поскольку командная строка указывает на' docker run', и вы не будете иметь порты, перемещающиеся по вам. – mkasberg

+0

пропустил kitematic, поскольку я не могу контролировать, как его начали. Я запустил команду запуска с консоли powershell следующим образом: «docker run --detach --name kafka -p 2181: 2181 -p 9092: 9092 --env ADVERTISED_HOST = 192.168.1.89 --env ADVERVTISED_PORT = 9092 spotify/kafka" (после того, как я удалил предыдущий контейнер). он, кажется, начинает нормально, и «докерные журналы кафки» говорят об этом. Затем я запускаю своего клиента и выглядит более здоровым, но повторяет «WARN o.a.k.c.NetworkClient: [. HandleResponse:]> Ошибка при извлечении метаданных с идентификатором корреляции 1: {wills topic = INVALID_TOPIC_EXCEPTION}». –

+0

Как вы общаетесь с процессом kafka, когда работаете с помощью команд докеров. есть этот комментарий на сайте kafka "bin/kafka-topics.sh --create --zookeeper localhost: 2181 - коэффициент повторения 1 - разделы 1 - топический тест", но для этого требуется, чтобы я мог общаться с отдельной кафкой и запустите команду оболочки. как вы «разговариваете» с отсоединенным контейнером через докер? –