2016-04-10 7 views
4

Ниже мой случай использованияКафка и Akka Cluster

  1. Куча приложений епдиеие сообщений в Кафки под разные темы.
  2. Потребитель каждой темы распределяет работу работнику в кластере. Работа может быть классифицирована как долго работающая, интенсивная память, простая и т. Д., И рабочий выбирается соответствующим образом.

Это я изучаю кластер Akka для распределения работы, маршрутизации и масштабирования. Я могу использовать Akka «Supervisor» в качестве потребителя Kafka и назначать входящую работу соответствующему работнику на основе его классификации.

Но я все еще пытаюсь понять, это правильный способ реализовать устойчивый способ общения между руководителем и рабочими кластера Akka. Потому что, как только супервизор потребляет сообщение от Kafka, смещение Kafka совершается. Если при обработке смещения происходит некоторая ошибка, является ли следующий приемлемый способ восстановления и начать с того места, где он был последним?

Сделать супервизора постоянным актером, используя прочный почтовый ящик под управлением Kafka. Супервайзер завершает работу в Кафке, и работник получает работу от Кафки и совершает ее компенсацию только после завершения работы.

+1

Привет, какую библиотеку вы используете для потребляющих от Кафки? Библиотека, которую вы используете, может иметь возможность не фиксировать сообщения автоматически после потребления. –

ответ

1

Как сказал Яакко, это действительно зависит от библиотеки третьей части, которую вы используете.

Насколько я могу судить, я успешно использовал Akka Streams Kafka, хотя я включил автоматическую фиксацию смещения.

Однако эта библиотека может удовлетворить ваши потребности, поскольку она позволяет вам настроить фиксацию смещения (см. Разделы External Offset Storage и Offset Storage in Kafka).

В документации сказано:

Consumer.committableSource позволяет совершать смещенные позиции Кафки. По сравнению с автоматическим фиксацией это дает точное управление, когда считается, что сообщение считается потребляемым.

Для того, чтобы отключить автоматической фиксации, вы должны выполнить файл Akka application.conf путем добавления akka.kafka.consumer раздел:

akka.kafka.consumer { 

    # Properties defined by org.apache.kafka.clients.consumer.ConsumerConfig 
    # can be defined in this configuration section. 

    kafka-clients { 
    # Disable auto-commit by default 
    enable.auto.commit = false 
    } 

} 

Последняя версия akka-stream-kafka_2.11 (версия 0.16) совместим с Акку 2.5.x, но вам должны переопределить зависимость akka-stream_2.11 с одним из инструментов Akka. В настоящее время я использую эту библиотеку с Akka 2.5.3, и она работает очень хорошо.

Надеется, что вы найдете то, что ищете :)