2016-12-29 4 views
4

Я сделал свежую установку Apache Kafka 0.10.1.0.понимание потребительская группа ID

Я смог отправить/получить сообщения в командной строке.

При использовании примера производителя/потребителя Java я не могу узнать group.id параметр «Пример потребителя».

Сообщите мне, как исправить эту проблему.

Ниже Потребительский Пример я использовал:

public static void main(String[] args) { 
      Properties props = new Properties(); 
      props.put("bootstrap.servers", "localhost:9092"); 
      props.put("group.id", "my-topic"); 
      props.put("enable.auto.commit", "true"); 
      props.put("auto.commit.interval.ms", "1000"); 
      props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 
      props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 
      KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); 
      try { 
       consumer.subscribe(Arrays.asList("my-topic")); 

        ConsumerRecords<String, String> records = consumer.poll(100); 
        System.err.println("records size=>"+records.count()); 
        for (ConsumerRecord<String, String> record : records) 
         System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); 



       } 
      catch (Exception ex){ 
       ex.printStackTrace(); 
      } 
      finally { 
       consumer.close(); 
      } 
     } 

После запуска команды для потребителя, я могу увидеть сообщения (на консоли) разместила производителем. Но не могут видеть сообщения от Java программы

бен \ Windows \ Кафка-консоли consumer.bat --bootstrap-сервер локальный: 9092 --topic моя-тема --from-начало

+0

Если вы запускаете своего java-пользователя и создаете некоторые сообщения ПОСЛЕ запуска его, вы все еще не видите, что какое-либо сообщение потребляется? –

+0

да, я получаю сообщение на консоли «records size => 0» – Ankit

ответ

7

Потребители обозначают себя именем группы потребителей, и каждая запись , опубликованная на эту тему, доставляется одному экземпляру потребителя в каждой группе пользователей, подпишивающейся . Потребители могут быть в отдельных процессах или на отдельных машинах.

Если все экземпляры потребителя имеют одну и ту же группу потребителей, то записи будут эффективно сбалансированы по нагрузке над экземплярами потребителей.

Если все экземпляры потребителей имеют разные группы потребителей, то каждая запись будет передаваться всем потребительским процессам.

Группа.id - это строка, которая однозначно идентифицирует группу потребительских процессов, к которой принадлежит этот потребитель.

(Kafka intro)

+0

Но как бы я знал эту строку, так как я не упоминал ни одной group.id при отправке сообщения? – Ankit

+0

вы можете использовать любую строку, если вы запустите двух пользователей, имеющих одну строку, они будут в одной группе. –

+0

Я не смог увидеть сообщения после размещения group.id над java-программой. Тем не менее, я могу видеть сообщение о выполнении команды (упомянутой выше) на консоли. По иронии судьбы, нет никакой group.id, необходимой для просмотра сообщений для потребителя. – Ankit

0

В коде вы условии, что вы просто ждать данных один раз для 100мс. Вы должны получить данные в цикле или дождаться более длительного периода времени (в этом случае вы получите только одну часть данных). Что касается «group.id», то в случае, когда вы запускаете пользователя с консоли, он получает случайный «group.id».

0

Поскольку никакого смещения не было предоставлено, клиент java будет ждать новых сообщений, но не будет показывать существующие сообщения - это как и ожидалось. Если один намерено прочитать все сообщения, уже в этой теме можно использовать этот кусок кода:

if (READ_FROM_BEGINNING) { 
    //consume all the messages from the topic from the beginning. 
    //this doesn't work reliably if it consumer.poll(..) is not called first 
    //probably because of lazy-loading issues    
    consumer.poll(10); 
    consumer.seekToBeginning(consumer.assignment()); //if intending to 
    //read from the beginning or call below to read from a predefined offset. 
    //consumer.seek(consumer.assignment().iterator().next(), READ_FROM_OFFSET); 
} 
0

Дайте любое случайное значение идентификатора группы ... это не имеет значения

props.put («group.id», «Любое случайное значение»);