2013-09-23 3 views
0

Я создаю несколько тем в Кафке. Я хочу получить всю тему от Kafka, и у меня разные болты и болты. И я хочу отправить каждую тему на соответствующий носик и связанный болт (например, для темы1 у меня есть соответствующий spout1 и bolt1, для topic2 у меня есть соответствующий spout2 и bolt2 и т. д.)
Как я могу это сделать?Извлечение тем из Кафки и отправка на соответствующий носик и болт

ответ

0

Хотя я не понимаю, что вы пытаетесь сделать (у вас есть отдельные топологии для каждой темы?), Как правило, что вы можете сделать, в вашем spout1 создать пользователя, который будет подписан на тему1 и испускать как только он его получит. а затем подключите выход к соответствующим болтам для дальнейшего выполнения.

Но, насколько я понял, вы должны взглянуть на реализацию проекта github storm-contrib на KafkaSpout. В основном это реализация носика, которая считывается из кластера Kafka, и все, что вам нужно, - это правильно настроить конфигурацию.

Из документации она в основном выглядеть как этот

SpoutConfig spoutConfig = new SpoutConfig(
       ImmutableList.of("kafkahost1", "kafkahost2"), // list of Kafka brokers 
       8, // number of partitions per host 
       "clicks", // topic to read from 
       "/kafkastorm", // the root path in Zookeeper for the spout to store the consumer offsets 
       "discovery"); // an id for this consumer for storing the consumer offsets in Zookeeper 
    KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig); 

Одна вещь, чтобы упомянуть здесь выше реализация использует Кафку 0.7 так и в случае, если вы работаете с последним (0,8, и вы должны) осуществление вы можете найти опору 0,8 here