2016-12-06 4 views
0

Я пытаюсь, чтобы x число пользователей обращалось к определенной теме в kafka, но не потребляло одни и те же сообщения. Я хочу, чтобы к примеру ...Kafka Consumer Cluster Environment Offset

Потребитель 1 выбирают вверх смещение 1 Потребитель 2 выбирает вверх смещение 2 Потребитель 1 выбирает вверх смещение 3 Потребитель 2 выбирает вверх смещение 4

Я хочу Кафка, чтобы выступать в качестве очереди для этих двух потребителей. Я заметил конфигурацию group.id, и я предположил, что вы можете использовать одну и ту же группу, и она будет обрабатывать ее соответственно, но она не работает так, как я думал.

Вот код, я использую ...

 public void init(){ 
      Properties props = new Properties(); 
      props.put("bootstrap.servers", kafkaUrl); 
      props.put("key.deserializer", StringDeserializer.class.getName()); 
      props.put("value.deserializer", StringDeserializer.class.getName()); 
      props.put("enable.auto.commit", "true"); 
      props.put("group.id", "group1"); 
      props.put("client.id", "KafkaConsumer-" + InetAddress.getLocalHost().getHostAddress()); 

      consumer = new KafkaConsumer<>(props); 
      consumer.subscribe(Arrays.asList("event1", "event2")); 

      Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(this::pollTopics, 1, 10, TimeUnit.SECONDS); 
    } 

    public void pollTopics() { 
     try { 
      ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE); 

      for (ConsumerRecord<String, String> record : records) { 
       AbstractProcessor processor = Processor.getProcessor(record.value(), record.topic(), mqttMapping, crudRepositoryStore); 
       if(processor != null) { 
        kafkaThreadPool.execute(processor); 
       } 
      } 
     }catch (Exception e){ 
      LOG.error("Polling exception occurred", e); 
     } 
    } 

Я хочу, чтобы иметь возможность запускать этот код в кластерной среде и имеют Кафка быть очереди. Я хочу, чтобы он вытащил сообщение и одновременно перешел к следующему смещению, затем следующий опрос кафки захватит следующее смещение. Это возможно? И если да, то что я делаю неправильно?

ответ

1

Это невозможно в Кафке (так, как вы его описали).

Если вы используете группы потребителей, один раздел может быть прочитан только одним потребителем. Таким образом, Kafka действительно масштабируется по разделам, т. Е. Если вы хотите иметь несколько потребителей (считывая разные данные), для каждого потребителя требуется как минимум один раздел. Если у вас больше разделов, чем у потребителей, некоторые (или все) потребители будут одновременно читать несколько разделов.

Решение для вас - создать тему с несколькими разделами (или использовать несколько тем и позволить всем пользователям вашей группы подписаться на темы).

+0

Хорошо, что имеет смысл, но я читал, что если у вас есть 2 раздела, у вас должно быть как минимум 2 потребителя. Итак, что произойдет, если один из потребителей опустится на час? Другие потребляющие не будут получать эти сообщения? –

+0

Подождите, я думаю, вы скажете, что если есть 2 раздела и только один потребитель, который он будет выбирать из обоих? Поэтому просто чтобы я правильно понял. Если я создаю два раздела и имею двух потребителей, он должен анализировать разные сообщения, и если кто-то действительно опустится, тогда другой потребитель получит все сообщения от обоих разделов? Если это правильно, у вас есть пример того, что необходимо для подписки на определенные разделы? Если это так, я приму свой ответ :). –

+0

Я также просто прочитал этот параграф с веб-сайта kafkas. «Концепция потребительской группы в Kafka обобщает эти две концепции. Как и в случае с группой потребителей, вы можете разделить обработку по совокупности процессов (членов группы потребителей). Как и в случае публикации-подписки, Kafka позволяет вам передавать сообщения нескольким группам потребителей ». Это говорит о том, что то, что я пытаюсь выполнить, возможно. Просто не уверен, как это сделать. –