2016-12-06 3 views
1

Какой должен быть лучший подход при внедрении потребителя кафки.Многопоточный потребитель или потребительский продукт Kafka

Задача считывается с Kafka и записывается обратно в db. Миллионы рядов

Подход 1: За раздел - для потребителя - Подождите, пока сообщение будет потреблено (т. Е. Записано обратно в db), затем перейдите к следующему в цикле опроса.

Подход 2: За раздел - на одного потребителя - Отправлять запись в рабочий поток или поток, который должен быть записан обратно в db, а затем совершать смещение и продолжать опрос. Управление смещением необходимо принимать во внимание. В этом не ждите сообщения, которое должно быть записано обратно в БД. Просто продолжайте опрос, передайте сообщение рабочей теме.

Любые идеи по обеим из них?

Благодаря

ответ

1

Подход 1: подход применим только, если это возможно для Вас, чтобы оценить время обработки сообщений в противном случае она не рекомендуется.

Проблема: В этом подходе основная проблема заключается в том, чтобы поддерживать потребителей в живом состоянии. Если вы будете ожидать, что сообщения будут полностью обработаны до вызова опроса(), вы должны убедиться, что ваш потребитель должен быть жив пока он не вызовет poll(), потому что kafka сохраняет свойство с именем session.timeout.ms. Брокер/кластер kafka берет на себя действие по стоимости этого свойства, если потребитель не может снова вызвать опрос() за период времени «session.timeout.ms», брокер пометит потребителя мертвым, и его вышвырнут , Теперь, когда потребитель завершит обработку сообщения и снова вызовет poll(), он будет считаться новым столяром и снова предоставит набор записей, начиная со смещения, как это было раньше. Учитывая этот сценарий, потребитель будет застревать в бесконечном цикле, где он никогда не начнет смещение.

Возможное решение 1: Чтобы использовать этот подход, вам нужно хорошее значение следующего свойства «session.timeout.ms» со следующими побочными эффектами:

1: Значение слишком низким: Потребитель будет помечен мертвым как описано выше, и никогда не будет выполнять свое смещение, однако сообщения будут обработаны, но каждый раз, когда он заканчивает сообщения, он снова получит предыдущие сообщения + новые сообщения.

2: Слишком высокое значение: Брокер будет очень поздно обнаруживать подлинный сбой потребителя, что приведет к дублированию записи и приведет к общей пропускной способности.

Возможное решение 2: (действует только для версии 0.10.1.x) Официальное исправление Kafka в выпуске (0.10.1.0). В этом подходе вводятся два заметных объекта: новое свойство «max.poll.interval.ms», которое устанавливает максимальную задержку между клиентскими вызовами poll() и фоновым потоком, который отвечает за сохранение жизни потребителя. Таким образом, в сценарии, когда потребитель вызывает метод poll(), а затем становится занятым обработкой сообщений, внутренний фоновый поток будет поддерживать оживление сердца, и в результате потребитель останется в живых. Однако этот внутренний фоновый поток сам останется в живых до тех пор, пока значение таймаута для свойства «max.poll.interval.ms» останется в силе. Таким образом, этот поток будет ждать, пока потребитель вызовет poll() в течение периода времени «max.poll.interval.ms», если нет, он отправит запрос на отпуск и сам умрет.«

Снова сложная часть этого решения заключается в том, чтобы найти подходящее значение этого свойства:« max.poll.interval.ms »(очень важно, на этот раз будет время, в течение которого фоновый поток будет сохранять сердцебиение живым без явного вызова опроса())

подход 2:. Использования рабочего потока является хорошей идеей, но тогда вы должны поддерживать внутреннюю очередь или проверки для принятых сообщений, которые могут быть сложными, а также вам нужно Для получения дополнительной информации о фиксации см. this и рубрика «Commits and Offsets».

Проблема: В этом подходе основной проблемой является отслеживание полученных сообщений и успешных сообщений. Так как ваш потребитель получит сообщение, он передаст сообщение соответствующему рабочему потоку и скорректирует смещение и переместится вперед, чтобы получить больше сообщений. Во время этого процесса вам необходимо позаботиться о следующих проблемах:

  1. Что делать, если сообщение получено и выполнено смещение, но позже по какой-либо причине рабочий поток не смог обработать сообщение, теперь, как получить это сообщение еще раз?
  2. Что делать, если сообщения получены потребителем, но нет свободных рабочих потоков для обработки?

Решение: Там могут быть разные пути решения вышеуказанных проблем и один из способов заключается в использовании внутренней очереди, чтобы сохранить сообщения и руководство совершает который будет отправлен только тогда, когда рабочий поток сообщит об успешном обработку сообщение. Однако требуется очень тщательная реализация, поскольку это может привести к сложному коду, а также может привести к проблемам управления памятью или потоками.

Предложение: В зависимости от ваших требований вы можете использовать один или другой подход, применяя фиксированные для возможных проблем, как описано выше. Однако я бы рекомендовал более надежное решение - использовать паузу/возобновление раздела. В очень абстрактном виде ваш потребитель должен выполнить следующие шаги:

1: poll() для сообщений.

2: приостановить все соответствующие темы/разделы.

3: Назначение сообщений рабочим потокам и ожидание их обработки.

4: Сохранять вызов poll(), но по мере приостановки разделов не будет получено никакого дополнительного сообщения, пока потребитель останется в живых. (Убедитесь, что в данный момент не зарегистрировано ни одной новой темы)

5: Если все рабочие потоки должны сообщать об успешном завершении обработки/сбое, выполните соответствующие смещения.

6: Возобновите все разделы.

Примечание: в зависимости от вашего сценария и требований возможны лучшие способы или другие решения. Это всего лишь идея или одно из возможных решений.

+0

Спасибо TechMaster – bittu

 Смежные вопросы

  • Нет связанных вопросов^_^