Я пытаюсь, чтобы x число пользователей обращалось к определенной теме в kafka, но не потребляло одни и те же сообщения. Я хочу, чтобы к примеру ...Kafka Consumer Cluster Environment Offset
Потребитель 1 выбирают вверх смещение 1 Потребитель 2 выбирает вверх смещение 2 Потребитель 1 выбирает вверх смещение 3 Потребитель 2 выбирает вверх смещение 4
Я хочу Кафка, чтобы выступать в качестве очереди для этих двух потребителей. Я заметил конфигурацию group.id, и я предположил, что вы можете использовать одну и ту же группу, и она будет обрабатывать ее соответственно, но она не работает так, как я думал.
Вот код, я использую ...
public void init(){
Properties props = new Properties();
props.put("bootstrap.servers", kafkaUrl);
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName());
props.put("enable.auto.commit", "true");
props.put("group.id", "group1");
props.put("client.id", "KafkaConsumer-" + InetAddress.getLocalHost().getHostAddress());
consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("event1", "event2"));
Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(this::pollTopics, 1, 10, TimeUnit.SECONDS);
}
public void pollTopics() {
try {
ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
for (ConsumerRecord<String, String> record : records) {
AbstractProcessor processor = Processor.getProcessor(record.value(), record.topic(), mqttMapping, crudRepositoryStore);
if(processor != null) {
kafkaThreadPool.execute(processor);
}
}
}catch (Exception e){
LOG.error("Polling exception occurred", e);
}
}
Я хочу, чтобы иметь возможность запускать этот код в кластерной среде и имеют Кафка быть очереди. Я хочу, чтобы он вытащил сообщение и одновременно перешел к следующему смещению, затем следующий опрос кафки захватит следующее смещение. Это возможно? И если да, то что я делаю неправильно?
Хорошо, что имеет смысл, но я читал, что если у вас есть 2 раздела, у вас должно быть как минимум 2 потребителя. Итак, что произойдет, если один из потребителей опустится на час? Другие потребляющие не будут получать эти сообщения? –
Подождите, я думаю, вы скажете, что если есть 2 раздела и только один потребитель, который он будет выбирать из обоих? Поэтому просто чтобы я правильно понял. Если я создаю два раздела и имею двух потребителей, он должен анализировать разные сообщения, и если кто-то действительно опустится, тогда другой потребитель получит все сообщения от обоих разделов? Если это правильно, у вас есть пример того, что необходимо для подписки на определенные разделы? Если это так, я приму свой ответ :). –
Я также просто прочитал этот параграф с веб-сайта kafkas. «Концепция потребительской группы в Kafka обобщает эти две концепции. Как и в случае с группой потребителей, вы можете разделить обработку по совокупности процессов (членов группы потребителей). Как и в случае публикации-подписки, Kafka позволяет вам передавать сообщения нескольким группам потребителей ». Это говорит о том, что то, что я пытаюсь выполнить, возможно. Просто не уверен, как это сделать. –