4

У меня есть два покупателя Kafka ConsumerA и ConsumerB. Я хочу, чтобы эти два покупателя кафки были независимы друг от друга на одной машине. Между ними нет никакой связи. Эти два потребителя кафки будут работать по разным темам на одной машине.Как управлять несколькими потребителями кафки на одном и том же ящике независимо друг от друга?

  • Каждый потребитель должен иметь другой объект Properties.
  • Каждый потребитель должен иметь другую конфигурацию пула потоков, поскольку они могут выполняться многопоточным способом (группа потребителей), если это необходимо независимо от другого потребителя.

Ниже мой дизайн:

потребительского класса (аннотация):

public abstract class Consumer implements Runnable { 
    private final Properties consumerProps; 
    private final String consumerName; 

    public Consumer(String consumerName, Properties consumerProps) { 
     this.consumerName = consumerName; 
     this.consumerProps = consumerProps; 
    } 

    protected abstract void shutdown(); 
    protected abstract void run(String consumerName, Properties consumerProps); 

    @Override 
    public final void run() { 
     run(consumerName, consumerProps); 
    } 
} 

ConsumerA класс:

public class ConsumerA extends Consumer { 
    private final AtomicBoolean closed = new AtomicBoolean(false); 
    private KafkaConsumer<byte[], byte[]> consumer; 

    public ConsumerA(String consumerName, Properties consumerProps) { 
     super(consumerName, consumerProps); 
    } 

    @Override 
    public void shutdown() { 
     closed.set(true); 
     consumer.wakeup(); 
    } 

    @Override 
    protected void run(String consumerName, Properties consumerProps) { 
     consumer = new KafkaConsumer<>(consumerProps); 
     consumer.subscribe(getTopicsBasisOnConsumerName()); 

     Map<String, Object> config = new HashMap<>(); 
     config.put(Config.URLS, TEST_URL); 
     GenericRecordDomainDataDecoder decoder = new GenericRecordDomainDataDecoder(config); 

     try { 
      while (!closed.get()) { 
       ConsumerRecords<byte[], byte[]> records = consumer.poll(Long.MAX_VALUE); 
       for (ConsumerRecord<byte[], byte[]> record : records) { 
        GenericRecord payload = decoder.decode(record.value()); 
        // extract data from payload 
        System.out.println("topic = %s, partition = %s, offset = %d, customer = %s, country = %s\n", 
             record.topic(), record.partition(), record.offset(), record.key(), record.value()); 
       } 
       consumer.commitAsync(); 
      } 
     } catch (WakeupException ex) { 
      // Ignore exception if closing 
      System.out.println("error= ", ex); 
      if (!closed.get()) throw e;    
     } catch (Exception ex) { 
      System.out.println("error= ", ex);  
     } finally { 
      try { 
       consumer.commitSync(); 
      } finally { 
       consumer.close(); 
      } 
     } 
    } 
} 

ConsumerA B Класс:

// similar to `ConsumerA` but with specific details of B 

ConsumerHandler класс:

public final class ConsumerHandler { 
    private final ExecutorService executorServiceConsumer; 
    private final Consumer consumer; 
    private final List<Consumer> consumers = new ArrayList<>(); 

    public ConsumerHandler(Consumer consumer, int poolSize) { 
    this.executorServiceConsumer = Executors.newFixedThreadPool(poolSize); 
    this.consumer = consumer; 
    for (int i = 0; i < poolSize; i++) { 
     this.consumers.add(consumer); 
     executorServiceConsumer.submit(consumer); 
    } 
} 
    public void shutdown() { 
    Runtime.getRuntime().addShutdownHook(new Thread() { 
     @Override 
     public void run() { 
     for (Consumer consumer : consumers) { 
      consumer.shutdown(); 
     } 
     executorServiceConsumer.shutdown(); 
     try { 
      executorServiceConsumer.awaitTermination(1000, TimeUnit.MILLISECONDS); 
     } catch (InterruptedException ex) { 
      Thread.currentThread().interrupt(); 
     } 
     } 
    }); 
    } 
} 

Ниже мой основной класс в одном из моих проектов, где, если я начинаю мой сервер, вызовы будут приходить первым автоматически и с этого места я начинаю все Кафка где я выполняю свои ConsumerA и ConsumerB. И как только вызывается выключение, я освобождаю все ресурсы, вызывая выключение всех моих потребителей Kafka.

import javax.annotation.PostConstruct; 
import javax.annotation.PreDestroy; 
import javax.inject.Singleton; 

@Singleton 
@DependencyInjectionInitializer 
public class Initializer { 
    private ConsumerHandler consumerHandlerA; 
    private ConsumerHandler consumerHandlerB; 

    @PostConstruct 
    public void init() { 
    consumerHandlerA = new ConsumerHandler (new ConsumerA("consumerA", getConsumerPropsA()), 3); 
    consumerHandlerB = new ConsumerHandler (new ConsumerB("consumerB", getConsumerPropsB()), 3); 
    } 

    @PreDestroy 
    public void shutdown() { 
    consumerHandlerA.shutdown(); 
    consumerHandlerB.shutdown(); 
    } 
} 

Это правильный дизайн для такого рода проблем, когда я хочу запускать нескольких потребителей кафки на одной коробке? Сообщите мне, есть ли лучший и эффективный способ решить эту проблему. В общем, я буду использовать трех или четырех потребителей Kafka в одной коробке, и каждый потребитель может иметь свою собственную группу потребителей, если это необходимо.

Это Javadoc для KafkaConsumer, который я использую как у своего потребителя. И основы на этом article Я создал своего потребителя, просто я использовал абстрактный класс для его расширения. Искать «Взять все это вместе» в этой ссылке.

В документах упоминается, что потребители не являются потокобезопасными, но похоже, что мой код повторно использует один и тот же экземпляр пользователя для каждого потока в пуле.

public ConsumerHandler(Consumer consumer, int poolSize) { 
    this.executorServiceConsumer = Executors.newFixedThreadPool(poolSize); 
    this.consumer = consumer; 
    for (int i = 0; i < poolSize; i++) { 
     this.consumers.add(consumer); 
     executorServiceConsumer.submit(consumer); 
    } 
} 

Каков наилучший способ решить эту проблему безопасности потоков и по-прежнему достичь тех же возможностей?

+1

Если это рабочий код, вопрос относится к http://codereview.stackexchange.com/ – jaco0646

ответ

-1

Try Apache Samza. Он решает эти проблемы с потребителями. Нет беспорядочной (а иногда и проблематичной) обработки потоков, избыточности с помощью кластеризации, проверенного решения триллионами проверенных обработанных сообщений и т. Д. В настоящее время мы запускаем более одного задания в кластере. Наш код намного менее сложный, чем у вас здесь.

+0

Как Самза поможет мне здесь? Это своего рода обертка, которая потребляет данные от кафки? – john

0

Быстрое предложение, извинения, если вы уже знаете об этом. Переменные уровня класса никогда не являются потокобезопасными. Если вам нужен другой объект Properties для каждого потока, лучше объявите их на уровне метода и предоставите их в качестве параметра другим методам, где вам нужно получить доступ к объекту Properties.

0

Самое простое решение для решения «Каков наилучший способ решить эту проблему безопасности потоков и по-прежнему достичь тех же возможностей?» :

Не выполняйте многопоточность (Thread API/Executor Service), но вместо этого используйте и запускайте каждого потребителя в качестве отдельного потребителя в своем отдельном процессе JVM, поэтому, если вам нужно 4 потребителя на одном компьютере, и вы не хотите справиться с чересчурными головными болями mutli, тогда ваш JAP-код kafka будет работать в своих 4 отдельных Java-процессах.