2016-08-19 5 views
4

Возможно ли иметь приложение Kafka Streams, которое проходит через все данные в теме и затем выходит?Остановить приложение Kafka Streams

Пример: Я делаю данные по темам, основанным на дате. Потребитель получает начало cron, пробегает все имеющиеся данные, а затем .. делает что? Я не хочу, чтобы он сидел и ждал больше данных. Просто предположим, что все там, а потом изящно выходит.

Возможно?

+0

'Я не хочу, чтобы он сидел и ждал больше данных'. Разве это не концепция самого потока? Вы говорите о пакетном планировании, а не потоковой передаче. – Dici

+0

Я немного путаюсь, почему вы приняли ответ ниже. Если мы говорим о библиотеке Kafka Streams, потребитель управляется внутренне, и вы не можете получить к нему доступ ... Итак, как должен работать приведенный ниже метод? –

ответ

3

Вы можете создать consumer, а затем, как только он перестанет подтягивать данные, вы можете позвонить по телефону consumer.close(). Или, если вы хотите снова провести опрос в будущем, позвоните по телефону consumer.pause() и позвоните по телефону .resume.

Один из способов сделать это в блоке опроса потребителей. Такие, как

data = consumer.poll() 
if (!data.next()) { 
    consumer.close() 
} 

Имейте в виду, что poll возвращает ConsumerRecord<K,V> и соответствует интерфейсу Iterable.

+0

Как я узнаю, что это сделано? – ethrbunny

+0

@ethrbunny проверить мое редактирование. – TheM00s3

+0

Этот подход не будет работать для приложения Kafka Streams, как упоминается в комментарии Маттиаса Дж. Сакса к вышеперечисленному вопросу. –

3

В потоках Kafka (как и для других решений для обработки потока), это не «конец данных», поскольку в первую очередь это обработка потока, а не пакетная обработка.

Тем не менее, вы можете наблюдать за «задержкой» вашего приложения Kafka Streams и закрывать его, если нет запаздывания (отставание, количество еще нерасходуемых сообщений).

Например, вы можете использовать bin/kafka-consumer-groups.sh, чтобы проверить отставание приложения Streams (идентификатор приложения используется как идентификатор группы потребителей). Если вы хотите вставить это в свои приложения Streams, вы можете использовать kafka.admin.AdminClient для получения информации о группе потребителей.

 Смежные вопросы

  • Нет связанных вопросов^_^