2017-01-02 7 views

ответ

0

Я не уверен, правильно ли я понял ваш вопрос. Тем не менее, я думаю, что существует несколько подходов (в зависимости от того, чего вы действительно хотите достичь, что непонятно из вопроса).

  1. Использование Кафка Streams DSL (Кафка 0,10): Используя Кафка Streams (библиотека, обработок потоков Java) можно указать агрегацию окна в барабанном окне любого размера
  2. Exploit метки времени (Кафка 0,10): если вы хотите использовать KafkaConsumer, вы можете просто читать сообщения и проверять их временные метки для передачи данных по интервалам
  3. Системное время (все версии Kafka): просто прочитайте сообщения от Kafka и поместите сообщения в интервалы базы в системное время. То есть, прежде чем обрабатывать следующую запись, вы проверяете локальные часы, чтобы помещать сообщения в промежутки времени.
+0

То, что я имею в виду, есть возможность вытащить данные из Кафки каждые 1 минуту (например: в 10:01 читать все записи с 10:00 до 10:01, в 10:02 читать все записи с 10:01 до 10:02 и т. д.) вместо того, чтобы получать новые записи во время выполнения? Я хочу читать данные для обработки каждого заданного интервала вместо хранения данных в памяти до обработки. – user7365161

+0

Для этого нет встроенной поддержки, потому что Kafka основан на pull. Вы должны поместить эту логику в клиент, используя один из предложенных подходов. Если я правильно понимаю, что вы правильно комментируете, вы можете использовать подход (3) в сочетании, чтобы получить текущие смещения конца журнала перед началом опроса(), и только потребительские сообщения к полученным смещениям (чтобы избежать чтения записей, которые добавлены после того, как вы начнете потреблять) –