2016-02-23 1 views
0

Я видел ниже код для использования сообщений от Kafka. Существует 1 тема с 20 разделами и 20 потоков создаются с помощью ExecutorService. Есть 20 потоков сообщений, каждый из которых считывается из 1 раздела. Когда эта программа будет запущена, 20 сообщений будут прочитаны и будут обработаны из темы. Когда один из этих потоков завершит обработку, я предполагаю, что будет прочитано следующее сообщение.Потребление сообщений с использованием Kafka Consumer - Java

Если в сценарии примера, где в теме 100 сообщений, будут ли все сообщения прочитаны и сохранены в памяти и будут обрабатываться одновременно по потокам или сообщение будет прочитано из темы только после сообщений обрабатываются потоком, обрабатываются?

public void run(int a_numThreads) { 
    Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); 
    topicCountMap.put(topic, new Integer(a_numThreads)); 
    Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap); 
    List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic); 

    // now launch all the threads 
    // 
    executor = Executors.newFixedThreadPool(20); 

    // now create an object to consume the messages 
    // 
    int threadNumber = 0; 
    for (final KafkaStream stream : streams) { 
     executor.submit(new ConsumerTest(stream, threadNumber)); 
     threadNumber++; 
    } 
} 

EDIT: Я нашел ответ в этом post. Однако у меня есть следующие вопросы:

Если есть 20 разделов для одной темы, могу ли я запустить пользователя на двух разных узлах? Должен ли я упоминать количество потоков сообщений как 10 в каждом из этих потребителей? Когда I узел выходит из строя или возникает проблема с производительностью, будет ли поток данных автоматически перебалансироваться с рабочим узлом?

ответ

1

Да, вы можете запускать более одного потребителя на разных узлах, чтобы потреблять из той же темы. Количество потоков сообщений может быть 10 разным в зависимости от конфигурации машины. Если его крошечная машина вы можете дать 5 или около того.

Если один узел не работает, он автоматически переносится для загрузки на другой узел. Помимо отказа есть другие свойства, такие как topic.metadata.refresh.interval.ms, который решает, когда перебалансировать для загрузки.

+0

Спасибо, Paresh. Если есть 20 разделов, могу ли я задать 20 разделов на обоих узлах? Если не удастся, я думаю, что все 20 разделов будут обработаны 20 потоками у одного потребителя. –

+0

Разделы указываются при создании темы. Если в вашей конфигурации 1 брокер, все 20 разделов находятся в одном брокере. Если у вас есть 2 брокера, разделы распределяются. Вам не нужно указывать количество разделов на уровне потребителя. Поэтому, чтобы ответить на этот вопрос: «Если кто-то терпит неудачу, я думаю, что все 20 разделов будут обработаны 20 потоками у одного потребителя». Да, если кто-то не удается, все разделы перемещаются на один узел и обрабатываются одним потребителем. – Paresh

+0

Извините, я не понял свой вопрос. Предположим, что в моем кластере есть 2 узла и 20 разделов. Я предполагаю, что 20 разделов будут распределены между двумя узлами, и трафик перебалансирует себя. У меня есть потребительский проект, который создает 20 потоков сообщений и обрабатывает данные с использованием 20 потоков. Если я разворачиваю одно и то же приложение в 2-х узлах, будет обработано 10 разделов одним потребительским приложением-экземпляром, а остальные 10 будут обработаны другим экземпляром. –