2016-03-15 1 views
0

У меня есть веб-служба SOAP, которая отправляет сообщение с запросом kafka и ждет ответного сообщения kafka (например, consumer.poll (10000)).Почему Kafka Consumer продолжает получать те же сообщения (смещение)

Каждый раз, когда веб-сервис называется, он создает нового продюсера Kafka и нового потребителя Kafka.

Каждый раз, когда я вызываю веб-службу, потребитель получает те же сообщения (например, сообщения с тем же самым смещением).

Я использую Kafka 0.9 и включаю автоматическую фиксацию и частоту автоматического фиксации 100 мс.

для каждого потребительского регистра, возвращаемого методом poll(), который обрабатывается в пределах его собственной Callable, например.

ConsumerRecords<String, String> records = consumer.poll(200); 

for (ConsumerRecord<String, String> record : records) { 

final Handler handler = new Handler(consumerRecord); 
      executor.submit(handler); 

} 

Почему я продолжаю получать одни и те же сообщения снова и снова?

UPDATE 0001

metric.reporters = [] 
metadata.max.age.ms = 300000 
value.deserializer = class com.kafka.MDCDeserializer 
group.id = group-A.group 
partition.assignment.strategy = [org.apache.kafka.clients.consumer.RangeAssignor] 
reconnect.backoff.ms = 50 
sasl.kerberos.ticket.renew.window.factor = 0.8 
max.partition.fetch.bytes = 1048576 
bootstrap.servers = [machine1:6667, machine2:6667, machine3:6667, machine0:6667] 
retry.backoff.ms = 100 
sasl.kerberos.kinit.cmd = /usr/bin/kinit 
sasl.kerberos.service.name = kafka 
sasl.kerberos.ticket.renew.jitter = 0.05 
ssl.keystore.type = JKS 
ssl.trustmanager.algorithm = PKIX 
enable.auto.commit = true 
ssl.key.password = null 
fetch.max.wait.ms = 500 
sasl.kerberos.min.time.before.relogin = 60000 
connections.max.idle.ms = 540000 
ssl.truststore.password = null 
session.timeout.ms = 30000 
metrics.num.samples = 2 
client.id = 
ssl.endpoint.identification.algorithm = null 
key.deserializer = class com.kafka.UUIDDerializer 
ssl.protocol = TLS 
check.crcs = true 
request.timeout.ms = 40000 
ssl.provider = null 
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] 
ssl.keystore.location = null 
heartbeat.interval.ms = 3000 
auto.commit.interval.ms = 5000 
receive.buffer.bytes = 32768 
ssl.cipher.suites = null 
ssl.truststore.type = JKS 
security.protocol = PLAINTEXTSASL 
ssl.truststore.location = null 
ssl.keystore.password = null 
ssl.keymanager.algorithm = IbmX509 
metrics.sample.window.ms = 30000 
fetch.min.bytes = 1024 
send.buffer.bytes = 131072 
auto.offset.reset = latest 
+0

Возможно ли, что вы настроили потребителя kafka всегда с наименьшим смещением? –

ответ

3

Основание на код, который вы показываете. Я думаю, что ваша проблема в том, что новый потребитель однопоточный. Если вы опросили один раз, а затем не выполните другой опрос, то auto.commit.offset не будет работать. Попробуйте поместить свой код в цикл while и посмотреть, будет ли когда вы снова вызываете опрос, смещение совершено.

+0

хорошее место !, как только мы сделали цикл, чтобы вызвать опрос несколько раз, все началось, как и ожидалось. Большое спасибо за ваше время – Hector

+0

без проблем hector, если http://stackoverflow.com/questions/35991849/how-does-should-kafka-consumer-cope-with-poison-messages/35993439#35993439 был также полезен для вас подумайте о его принятии. – Nautilus