2016-07-20 4 views
2

Я написал приложение Kafka Streaming, которое просто фильтрует строки на основе некоторого состояния и загружает его в MongoDB.Сброс сброса потока Kafka в ноль для группы потребителей

Процесс потоковой передачи работает нормально, но из-за некоторой ошибки в моем коде, я хочу снова обрабатывать все данные.

Один из способов - убить потоковое приложение, сменить идентификатор группы клиентов, удалить данные из монго и повторно запустить приложение.

Как достичь этого сценария без изменения идентификатора группы потребителей.

< < Я использую Кафку 0.10 версии >>

Большое спасибо Pari

ответ

4

Apache Кафка 0.10.0.1 (который был выпущен в августе, в то время как оригинальный вопрос был задан в июле) поставляется с новым приложением Reset Tool для Кафки потоков, которые это простое и лучшее/более чистое решение, чем просто переименование application.id.

Вы можете выполнить инструмент с помощью скрипта bin/kafka-streams-application-reset.sh, который также распечатает сообщение об использовании/помощи.

Пример:

# Run this only after ALL application instances were stopped! 
$ bin/kafka-streams-application-reset --application-id my-streams-app \ 
             --input-topics my-input-topic \ 
             --intermediate-topics rekeyed-topic \ 
             --bootstrap-servers brokerHost:9092 \ 
             --zookeeper zookeeperHost:2181 

То есть, я бы рекомендовал прочитать Data Reprocessing with Kafka Streams: Resetting a Streams Application, который написал вышеупомянутый Matthias J. Sax, для получения более подробной информации. В этой статье также объясняется, почему просто переименование application.id (что было обходным решением до сих пор) - не самая лучшая идея.

+0

Есть ли такие темы, как '-KSTREAM-KEY-SELECT-0000000179-repartition', как промежуточные темы? –

+0

Я думаю, согласно http://docs.confluent.io/current/streams/developer-guide.html#internal-topics, это внутренняя тема. –

2

Полученное обновление от Matthias J. Sax [email protected] -

в настоящее время, изменение идентификатора приложения является лучший способ следовать. Правильная очистка состояния приложения, немного сложная. Мы в настоящее время работаем над улучшением для этого - должны быть доступны в ближайшее время.

См https://issues.apache.org/jira/browse/KAFKA-3185

Приветствие Pari

+1

Хороший ответ :-) –

 Смежные вопросы

  • Нет связанных вопросов^_^