2016-11-14 5 views
0

Весной-Кафкой я хочу пересмотреть тему Кафки с самого начала. Делая это, изменив group.id к чему-то неизвестному до Кафки конечно работает:reconsume Kafka 0.10.1 тема весна

@KafkaListener(topics = "sensordata.t") 
public void receiveMessage(String message) { 
... 
} 

@Bean 
public Map consumerConfigs() { 
    Map props = new HashMap<>(); 
    props.put(ConsumerConfig.GROUP_ID_CONFIG, "NewGroupID"); 
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); 
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); //it still commits though... 
    return props; 
} 

Однако, начиная с более, установив смещение 0 не удается.

@KafkaListener(topicPartitions = 
{ @TopicPartition(topic = "sensordata.t", 
     partitionOffsets = @PartitionOffset(partition = "0", initialOffset = "0"))}) 
public void receiveMessage(String message) { 
... 
} 

@Bean 
public Map consumerConfigs() { 
    Map props = new HashMap<>(); 
    ... 
    props.put(ConsumerConfig.GROUP_ID_CONFIG, "NewGroupID"); 
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); 
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); 
    props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "10000"); //making timeout window larger seems to have no influence 
    props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "1"); //setting max records to 1 makes no difference 
    return props; 
} 

Я получаю ошибку:

2016-11-14 14:07:59.018 INFO 8165 --- [   main] c.i.t.s.server.SpringKafkaApplication : Started SpringKafkaApplication in 4.134 seconds (JVM running for 4.745) 
2016-11-14 14:07:59.125 INFO 8165 --- [afka-consumer-1] o.a.k.c.c.internals.AbstractCoordinator : Discovered coordinator bto:9092 (id: 2147483647 rack: null) for group spring8. 
2016-11-14 14:07:59.125 INFO 8165 --- [afka-consumer-1] o.a.k.c.c.internals.AbstractCoordinator : Discovered coordinator bto:9092 (id: 2147483647 rack: null) for group spring8. 
2016-11-14 14:07:59.129 INFO 8165 --- [afka-consumer-1] o.a.k.c.c.internals.ConsumerCoordinator : Revoking previously assigned partitions [] for group spring8 
2016-11-14 14:07:59.129 INFO 8165 --- [afka-consumer-1] o.s.k.l.KafkaMessageListenerContainer : partitions revoked:[] 
2016-11-14 14:07:59.129 INFO 8165 --- [afka-consumer-1] o.a.k.c.c.internals.AbstractCoordinator : (Re-)joining group spring8 
2016-11-14 14:07:59.338 ERROR 8165 --- [afka-consumer-1] essageListenerContainer$ListenerConsumer : Container exception 

org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured session.timeout.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records. 
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:600) ~[kafka-clients-0.10.0.1.jar:na] 
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:541) ~[kafka-clients-0.10.0.1.jar:na] 
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:679) ~[kafka-clients-0.10.0.1.jar:na] 
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:658) ~[kafka-clients-0.10.0.1.jar:na] 
    at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167) ~[kafka-clients-0.10.0.1.jar:na] 
    at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133) ~[kafka-clients-0.10.0.1.jar:na] 
    at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107) ~[kafka-clients-0.10.0.1.jar:na] 
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:426) ~[kafka-clients-0.10.0.1.jar:na] 
    at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:278) ~[kafka-clients-0.10.0.1.jar:na] 
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360) ~[kafka-clients-0.10.0.1.jar:na] 
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224) ~[kafka-clients-0.10.0.1.jar:na] 
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192) ~[kafka-clients-0.10.0.1.jar:na] 
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163) ~[kafka-clients-0.10.0.1.jar:na] 
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:426) ~[kafka-clients-0.10.0.1.jar:na] 
    at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1059) ~[kafka-clients-0.10.0.1.jar:na] 
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.commitIfNecessary(KafkaMessageListenerContainer.java:939) ~[spring-kafka-1.1.1.RELEASE.jar:na] 
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.processCommits(KafkaMessageListenerContainer.java:816) ~[spring-kafka-1.1.1.RELEASE.jar:na] 
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:526) ~[spring-kafka-1.1.1.RELEASE.jar:na] 
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_92] 
    at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_92] 
    at java.lang.Thread.run(Thread.java:745) [na:1.8.0_92] 

Любой, кто знаком с этим?

Я использую Kafka 0.10.1.0 и

<properties> 
    <java.version>1.8</java.version> 
    <spring-kafka.version>1.1.1.RELEASE</spring-kafka.version> 
</properties> 

ответ

0

Почему вы решили, что это проблема смещения 0? Вашего StackTrace говорит, что у вас есть больше pollTimeout длиннее session.timeout.ms:

Commit не может быть завершен, поскольку группа уже балансировки и назначены разделы другого члена. Это означает, что время между последующими вызовами poll() было больше, чем сконфигурированное session.timeout.ms, что обычно подразумевает, что цикл опроса тратит слишком много времени на обработку сообщений. Вы можете решить эту проблему либо путем увеличения таймаута сеанса, либо путем уменьшения максимального размера партий, возвращаемых в poll(), с помощью max.poll.records.

Как правильно отрегулировать их?

+0

У меня нет. Но я устанавливаю таймаут до 10 секунд и максимальный размер партии до 1, и эта ошибка появляется в течение нескольких секунд (<10). – Raf

+0

Кроме того, отсутствие искажения со смещением не приводит к ошибке. Нет необходимости настраивать время ожидания пакетной обработки. – Raf

+0

ОК. Было бы лучше, если бы вы придумали какое-то простое приложение Spring Boot для игры с нашей стороны. Ваша stacktrace не связана с 'initPartitionsIfNeeded()', поэтому непонятно, как '0' может влиять на регулярные' poll() 'и' commitSync() ' –