2016-03-08 1 views
2

Типичный Кафка потребитель выглядит следующим образом:Предотвращение потери сообщения с Кафка высокого уровня потребительских 0.8.x

Кафка-брокер ---> Кафка-потребитель ----> вниз по течению потребителя как Elastic-Search

И в соответствии с документацией для Kafka High Level Consumer:

установка «auto.commit.interval.ms», как часто обновляется с потребленных смещений записываются Zookeeper

Кажется, что может быть потеря сообщений, если следующие две вещи:

  1. Смещения совершаются только после того, как некоторые сообщения извлекаются из Кафки брокеров.
  2. Потребители нижестоящих потоков (скажем, Elastic-Search) не обрабатывают самую последнюю партию сообщений или сам потребительский процесс убит.

Возможно, было бы самым идеальным, если зачеты являются не совершившие автоматически на основе временного интервала, но они совершены API. Это позволит убедиться, что потребитель кафки может сигнализировать о совершении смещений только после того, как он получит подтверждение от нисходящего потребителя, что они успешно использовали сообщения. Там может быть некоторая повторная передача сообщений (если kafka-consumer умирает до совершения смещений), но по крайней мере потеря сообщений не будет.

Пожалуйста, дайте мне знать, существует ли такой API для потребителя высокого уровня.

Примечание: Я знаю API-интерфейс низкого уровня в версии 0.8.x Kafka, но я не хочу самостоятельно управлять всем, когда все, что мне нужно, это всего лишь один простой API для пользователей высокого уровня.

Ref:

  1. AutoCommitTask.run(), искать commitOffsetsAsync
  2. SubscriptionState.allConsumed()

ответ

3

Существует а commitOffsets() API в High Level Consumer API, которые могут быть использованы для решения этой проблемы.

Также установите опцию «auto.commit.enable» на «false», чтобы ни в коем случае смещения автоматически выполнялись потребителем kafka.