Весной-Кафкой я хочу пересмотреть тему Кафки с самого начала. Делая это, изменив group.id
к чему-то неизвестному до Кафки конечно работает:reconsume Kafka 0.10.1 тема весна
@KafkaListener(topics = "sensordata.t")
public void receiveMessage(String message) {
...
}
@Bean
public Map consumerConfigs() {
Map props = new HashMap<>();
props.put(ConsumerConfig.GROUP_ID_CONFIG, "NewGroupID");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); //it still commits though...
return props;
}
Однако, начиная с более, установив смещение 0 не удается.
@KafkaListener(topicPartitions =
{ @TopicPartition(topic = "sensordata.t",
partitionOffsets = @PartitionOffset(partition = "0", initialOffset = "0"))})
public void receiveMessage(String message) {
...
}
@Bean
public Map consumerConfigs() {
Map props = new HashMap<>();
...
props.put(ConsumerConfig.GROUP_ID_CONFIG, "NewGroupID");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "10000"); //making timeout window larger seems to have no influence
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "1"); //setting max records to 1 makes no difference
return props;
}
Я получаю ошибку:
2016-11-14 14:07:59.018 INFO 8165 --- [ main] c.i.t.s.server.SpringKafkaApplication : Started SpringKafkaApplication in 4.134 seconds (JVM running for 4.745)
2016-11-14 14:07:59.125 INFO 8165 --- [afka-consumer-1] o.a.k.c.c.internals.AbstractCoordinator : Discovered coordinator bto:9092 (id: 2147483647 rack: null) for group spring8.
2016-11-14 14:07:59.125 INFO 8165 --- [afka-consumer-1] o.a.k.c.c.internals.AbstractCoordinator : Discovered coordinator bto:9092 (id: 2147483647 rack: null) for group spring8.
2016-11-14 14:07:59.129 INFO 8165 --- [afka-consumer-1] o.a.k.c.c.internals.ConsumerCoordinator : Revoking previously assigned partitions [] for group spring8
2016-11-14 14:07:59.129 INFO 8165 --- [afka-consumer-1] o.s.k.l.KafkaMessageListenerContainer : partitions revoked:[]
2016-11-14 14:07:59.129 INFO 8165 --- [afka-consumer-1] o.a.k.c.c.internals.AbstractCoordinator : (Re-)joining group spring8
2016-11-14 14:07:59.338 ERROR 8165 --- [afka-consumer-1] essageListenerContainer$ListenerConsumer : Container exception
org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured session.timeout.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:600) ~[kafka-clients-0.10.0.1.jar:na]
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:541) ~[kafka-clients-0.10.0.1.jar:na]
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:679) ~[kafka-clients-0.10.0.1.jar:na]
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:658) ~[kafka-clients-0.10.0.1.jar:na]
at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167) ~[kafka-clients-0.10.0.1.jar:na]
at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133) ~[kafka-clients-0.10.0.1.jar:na]
at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107) ~[kafka-clients-0.10.0.1.jar:na]
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:426) ~[kafka-clients-0.10.0.1.jar:na]
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:278) ~[kafka-clients-0.10.0.1.jar:na]
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360) ~[kafka-clients-0.10.0.1.jar:na]
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224) ~[kafka-clients-0.10.0.1.jar:na]
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192) ~[kafka-clients-0.10.0.1.jar:na]
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163) ~[kafka-clients-0.10.0.1.jar:na]
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:426) ~[kafka-clients-0.10.0.1.jar:na]
at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1059) ~[kafka-clients-0.10.0.1.jar:na]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.commitIfNecessary(KafkaMessageListenerContainer.java:939) ~[spring-kafka-1.1.1.RELEASE.jar:na]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.processCommits(KafkaMessageListenerContainer.java:816) ~[spring-kafka-1.1.1.RELEASE.jar:na]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:526) ~[spring-kafka-1.1.1.RELEASE.jar:na]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_92]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_92]
at java.lang.Thread.run(Thread.java:745) [na:1.8.0_92]
Любой, кто знаком с этим?
Я использую Kafka 0.10.1.0
и
<properties>
<java.version>1.8</java.version>
<spring-kafka.version>1.1.1.RELEASE</spring-kafka.version>
</properties>
У меня нет. Но я устанавливаю таймаут до 10 секунд и максимальный размер партии до 1, и эта ошибка появляется в течение нескольких секунд (<10). – Raf
Кроме того, отсутствие искажения со смещением не приводит к ошибке. Нет необходимости настраивать время ожидания пакетной обработки. – Raf
ОК. Было бы лучше, если бы вы придумали какое-то простое приложение Spring Boot для игры с нашей стороны. Ваша stacktrace не связана с 'initPartitionsIfNeeded()', поэтому непонятно, как '0' может влиять на регулярные' poll() 'и' commitSync() ' –