2016-11-02 8 views
1

Недавно я начал использовать Kafka и оценил Kafka для нескольких случаев использования.Внедрение фильтрации для сообщений kafka

Если бы мы хотели предоставить возможность фильтровать сообщения для потребителей (подписчиков) на основе содержимого сообщений, что лучше всего подходит для этого?

Say тема под названием «Торговля» экспонируются производителем, который имеет различные сделки детали, такие как название рынка, дата создания, цены и т.д.

Некоторые потребители заинтересованы в торгах за конкретные рынки и другие заинтересованы в торги после определенной даты и т.д. (содержание фильтрации на основе)

Как фильтрации не представляется возможным на брокерскую стороне, что является лучшим из возможных подходов для реализации ниже случаев:

  1. Если критерии фильтрации специфичен для потребителя. Должны ли мы использовать Потребительский перехватчик (хотя рекомендуется использовать перехватчик для регистрации цели согласно документации)?
  2. Если критерии фильтрации (фильтрация на основе контента) распространены среди потребителей, каков должен быть подход?

Слушайте тему и фильтровать сообщения локально и писать в новую тему (с использованием либо перехватчика или потоков)

+0

Что вы сделали до сих пор? –

+0

Не знаете, на что вы нацелились. Вы не можете делать фильтрацию на стороне брокера, если это будет целью. Или вы хотите использовать тему, фильтровать ее и писать в новую тему? Вы можете уточнить свой вопрос? –

+0

@ MatthiasJ.Sax, я обновил сообщение и предоставил пример. – RGoyal

ответ

0

Создайте свой собственный класс перехватчика, который реализует org.apache.kafka.clients.consumer.ConsumerInterceptor и реализовать ваша логика в методе 'onConsume' перед настройкой конфигурации "interceptor.classes" для потребителя.

+0

, поскольку перехватчик главным образом предназначен для ведения журнала и целей мониторинга. Реализация фильтрации на перехватчике будет означать эффективную реализацию фильтра, включающего потребительский код. – RGoyal

+0

, так что вы хотите написать логику фильтрации на стороне сервера? – amethystic

+0

да, я хотел что-то вроде фильтрации на стороне сервера – RGoyal

1

Если вы правильно поняли, что у вас есть вопрос, у вас есть одна тема и разные потребители, которые интересуются конкретными частями темы. В то же время, вы не являетесь владельцем этого потребителя и хотите избежать того, чтобы этот потребитель просто прочитал всю тему и сделал фильтрацию самостоятельно?

Для этого единственный способ сделать это для создания нового приложения, которое читает всю тему, выполняет фильтрацию (или фактически разделяет) и записывает данные на две (несколько) разных тем. Внешний потребитель был бы потребителем с этих новых тем и получал бы только дату, в которой они заинтересованы.

Использование Kafka Streams для этой цели было бы очень хорошим способом. DSL должен предложить все, что вам нужно.

В качестве альтернативы вы можете просто написать собственное приложение, используя KafkaConsumer и KafkaProducer, чтобы выполнить фильтрацию/разделение вручную в вашем пользовательском коде. Это не сильно отличается от использования Kafka Streams, поскольку приложение Kafka Streams будет делать то же самое внутри. Однако с Streams ваши усилия, чтобы сделать это, были бы меньше.

Я бы не использовал перехватчики для этого. Даже это будет работать, похоже, это не хороший дизайн программного обеспечения для вашего случая использования.

+0

Спасибо. ваше понимание правильное. Некоторый потребитель принадлежит нашему приложению, а некоторые нет. Таким образом, единственный способ сделать фильтрацию - это использовать данные и написать данные наполнителя по новой теме? – RGoyal

+0

Да. Если вы не являетесь владельцем некоторых потребителей, вы можете только запретить им читать все, написав интересующие их части в новую тему. –