2015-03-17 5 views
1

Я пытаюсь использовать потребителя высокого уровня для пакетного чтения сообщений в теме Kafka. Во время этой серии чтения моя нить должна остановиться в какой-то момент.Kafka потребитель высокого уровня

Либо, как только все сообщения в теме исчерпаны. или Get максимальное смещение в точке, когда сообщения должны быть считаны, и остановить до достижения максимального смещения.

Я пытался использовать код на high-level-consumer но методы итераторов на KafkaStream, кажется, блокирующий вызов и ждет, пока еще сообщения приходит.

Так 3 вопроса,

  1. Как узнать, что больше нет сообщений, которые нужно читать из этой темы?

  2. Если у меня есть ответ на вышеупомянутый вопрос, как я могу остановить его от прослушивания темы?

  3. Есть ли способ найти максимальное смещение при запуске пакетного чтения (я думаю, простой пользователь может это сделать) и сделать остановку высокого уровня в этой точке?

ответ

0

У вас есть возможность решить, что, когда новое сообщение не поступило за определенное количество времени, вы считаете, что все сообщения были прочитаны. Это можно установить с использованием потребительской собственности consumer.timeout.ms. После того, как это указанное значение прошло без каких-либо новых сообщений, ConsumerIterator выкинет исключение тайм-аута, и вы сможете обработать его у потребителя и выйти.