Я создаю несколько тем в Кафке. Я хочу получить всю тему от Kafka, и у меня разные болты и болты. И я хочу отправить каждую тему на соответствующий носик и связанный болт (например, для темы1 у меня есть соответствующий spout1 и bolt1, для topic2 у меня есть соответствующий spout2 и bolt2 и т. д.)
Как я могу это сделать?Извлечение тем из Кафки и отправка на соответствующий носик и болт
ответ
Хотя я не понимаю, что вы пытаетесь сделать (у вас есть отдельные топологии для каждой темы?), Как правило, что вы можете сделать, в вашем spout1 создать пользователя, который будет подписан на тему1 и испускать как только он его получит. а затем подключите выход к соответствующим болтам для дальнейшего выполнения.
Но, насколько я понял, вы должны взглянуть на реализацию проекта github storm-contrib на KafkaSpout. В основном это реализация носика, которая считывается из кластера Kafka, и все, что вам нужно, - это правильно настроить конфигурацию.
Из документации она в основном выглядеть как этот
SpoutConfig spoutConfig = new SpoutConfig(
ImmutableList.of("kafkahost1", "kafkahost2"), // list of Kafka brokers
8, // number of partitions per host
"clicks", // topic to read from
"/kafkastorm", // the root path in Zookeeper for the spout to store the consumer offsets
"discovery"); // an id for this consumer for storing the consumer offsets in Zookeeper
KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
Одна вещь, чтобы упомянуть здесь выше реализация использует Кафку 0.7 так и в случае, если вы работаете с последним (0,8, и вы должны) осуществление вы можете найти опору 0,8 here
Я создал демонстрационный проект для кафки, который вы должны использовать в качестве отправной точки: https://github.com/buildlackey/cep/tree/master/storm%2Bkafka.
привет/
Крис