2017-02-06 16 views
2

У меня есть работа по потоку flink, которая считывает данные с kafka и просто регистрирует ее. Я включил контрольно-пропускные пункты.Флинк не совершает смещения в kafka

Я не вижу замеченных смещений в kafka, вместо этого я становлюсь ниже ошибки.

Любая помощь очень полезна.

{$KAFKA_HOME/bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server localhost:9092 --describe --group flink-consumer-group 
Error while executing consumer group command Group flink-consumer-group with protocol type '' is not a valid consumer group 
java.lang.IllegalArgumentException: Group flink-consumer-group with protocol type '' is not a valid consumer group 
at kafka.admin.AdminClient.describeConsumerGroup(AdminClient.scala:152) 
at kafka.admin.ConsumerGroupCommand$KafkaConsumerGroupService.describeGroup(ConsumerGroupCommand.scala:308) 
at kafka.admin.ConsumerGroupCommand$ConsumerGroupService$class.describe(ConsumerGroupCommand.scala:89) 
at kafka.admin.ConsumerGroupCommand$KafkaConsumerGroupService.describe(ConsumerGroupCommand.scala:296) 
at kafka.admin.ConsumerGroupCommand$.main(ConsumerGroupCommand.scala:68) 
at kafka.admin.ConsumerGroupCommand.main(ConsumerGroupCommand.scala)} 

Версия

kafka_2.11-0.10.1.0 (server with) flink-connector-kafka-0.10_2.11 
+0

Я наблюдаю ту же проблему, с любопытством, если бы вы смогли ее решить? – Tim

ответ

0

Итак, я понял, что флинк совершает смещения в kafka на чек-пойнте, но эти смещения не видны с помощью калитки-коррекции kafka-offset.

Мы внедрили потребителя scala kafka, который соединяется с kafka с той же группой потребителей, но не подписывается на эту тему, чтобы получить смещения от kafka.

0

Flink обрабатывает коррекции его собственного. Смещения, которые совершаются в kafka (или zookeeper в более старых версиях или настройках), более или менее предназначены только для вашей информации или для целей мониторинга.

Ваша ошибка выглядит так, будто вы перепутали разные версии kafka (версия брокера по сравнению с версией клиента). Возможно, вы можете проверить это дважды.

+0

Спасибо за ответ TobiSh. Я понимаю, что флинк не использует коммандные смещения, но при перезапуске приложения без точки сохранения он должен использовать совершенные смещения wihch в моем случае не происходит. Я использую kafka_2.11-0.10.1.0 (сервер с) flink-connector-kafka-0.10_2.11 – mukh007

+0

Привет @ mukh007 у вас есть исходный код, где мы можем воспроизвести это поведение. Также полезно будет использовать конфигурацию flink-кластера и свойства kafka. – TobiSH

+0

Еще одна вещь. Вы находите что-то подобное в своих файлах журналов: Раздел {} не имеет начального смещения; у потребителя есть позиция {}, поэтому начальное смещение будет установлено на {} – TobiSH

 Смежные вопросы

  • Нет связанных вопросов^_^