2015-12-17 1 views
1

Сценарий:Как определить необработанные сообщения после Кафки потребления тему

потока создания [StreamName] --definition "Кафка -zkconnect = 10.10.10.1: 2181 --topic = | MyCompositeModule" --deploy

Мы запускаем этот поток в распределенном режиме, а redis - транспортная шина. По моему пониманию, источник kafka поддерживает смещения для сообщений, потребляемых MyCompositeModule (который является раковиной, как его модуль, созданный с помощью процесса создания модуля) через тему [streamname] -kafka-offsets. Это нечитаемо, и я был бы признателен, если бы был способ прочитать данные из этой темы.

Кроме того, когда я нажимаю сообщения из источника kafka, сообщения ставятся в очередь в redis transport, затем модуль извлекает их из этой очереди.

Если потребительский модуль kafka начинает потреблять 1000 сообщений из очереди kafka redis, а составной модуль выходит из строя после приема 10 сообщений или случайно обработанных 10 сообщений. Так как идентифицировать оставшиеся 990 [1000 (потребляемых) - 10 (обработано) = 990] необработанные сообщения. Даже если мы проверим смещения кафки, он покажет количество потребляемых сообщений. пример: -kafka.offsets - который не читается в нашем процессе.

Таким образом, все необработанные сообщения будут в очереди Redis, поскольку мы используем Redis в SpringXD. Так может ли кто-нибудь помочь мне узнать, как идентифицировать необработанные сообщения и пересылать их в составный модуль для обработки.

В принципе, я ищу рекомендации по элегантному решению для надежной доставки, добавляя возможность отказоустойчивости в весенний поток xd при потреблении от источника кафки.

ответ

1

Если сообщения эффективно расходуются от Kafka и перемещаются на автобус, они будут признаны потребляемыми с точки зрения менеджера смещения.

Вы можете попробовать включить повторную и мертвую надписи для Redis Message Bus, как описано здесь: http://docs.spring.io/spring-xd/docs/current/reference/html/#error-handling-message-delivery-failures.

Cheers, Marius