У меня есть два покупателя Kafka ConsumerA
и ConsumerB
. Я хочу, чтобы эти два покупателя кафки были независимы друг от друга на одной машине. Между ними нет никакой связи. Эти два потребителя кафки будут работать по разным темам на одной машине.Как управлять несколькими потребителями кафки на одном и том же ящике независимо друг от друга?
- Каждый потребитель должен иметь другой объект Properties.
- Каждый потребитель должен иметь другую конфигурацию пула потоков, поскольку они могут выполняться многопоточным способом (группа потребителей), если это необходимо независимо от другого потребителя.
Ниже мой дизайн:
потребительского класса (аннотация):
public abstract class Consumer implements Runnable {
private final Properties consumerProps;
private final String consumerName;
public Consumer(String consumerName, Properties consumerProps) {
this.consumerName = consumerName;
this.consumerProps = consumerProps;
}
protected abstract void shutdown();
protected abstract void run(String consumerName, Properties consumerProps);
@Override
public final void run() {
run(consumerName, consumerProps);
}
}
ConsumerA класс:
public class ConsumerA extends Consumer {
private final AtomicBoolean closed = new AtomicBoolean(false);
private KafkaConsumer<byte[], byte[]> consumer;
public ConsumerA(String consumerName, Properties consumerProps) {
super(consumerName, consumerProps);
}
@Override
public void shutdown() {
closed.set(true);
consumer.wakeup();
}
@Override
protected void run(String consumerName, Properties consumerProps) {
consumer = new KafkaConsumer<>(consumerProps);
consumer.subscribe(getTopicsBasisOnConsumerName());
Map<String, Object> config = new HashMap<>();
config.put(Config.URLS, TEST_URL);
GenericRecordDomainDataDecoder decoder = new GenericRecordDomainDataDecoder(config);
try {
while (!closed.get()) {
ConsumerRecords<byte[], byte[]> records = consumer.poll(Long.MAX_VALUE);
for (ConsumerRecord<byte[], byte[]> record : records) {
GenericRecord payload = decoder.decode(record.value());
// extract data from payload
System.out.println("topic = %s, partition = %s, offset = %d, customer = %s, country = %s\n",
record.topic(), record.partition(), record.offset(), record.key(), record.value());
}
consumer.commitAsync();
}
} catch (WakeupException ex) {
// Ignore exception if closing
System.out.println("error= ", ex);
if (!closed.get()) throw e;
} catch (Exception ex) {
System.out.println("error= ", ex);
} finally {
try {
consumer.commitSync();
} finally {
consumer.close();
}
}
}
}
ConsumerA B Класс:
// similar to `ConsumerA` but with specific details of B
ConsumerHandler класс:
public final class ConsumerHandler {
private final ExecutorService executorServiceConsumer;
private final Consumer consumer;
private final List<Consumer> consumers = new ArrayList<>();
public ConsumerHandler(Consumer consumer, int poolSize) {
this.executorServiceConsumer = Executors.newFixedThreadPool(poolSize);
this.consumer = consumer;
for (int i = 0; i < poolSize; i++) {
this.consumers.add(consumer);
executorServiceConsumer.submit(consumer);
}
}
public void shutdown() {
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
for (Consumer consumer : consumers) {
consumer.shutdown();
}
executorServiceConsumer.shutdown();
try {
executorServiceConsumer.awaitTermination(1000, TimeUnit.MILLISECONDS);
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}
}
});
}
}
Ниже мой основной класс в одном из моих проектов, где, если я начинаю мой сервер, вызовы будут приходить первым автоматически и с этого места я начинаю все Кафка где я выполняю свои ConsumerA
и ConsumerB
. И как только вызывается выключение, я освобождаю все ресурсы, вызывая выключение всех моих потребителей Kafka.
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.inject.Singleton;
@Singleton
@DependencyInjectionInitializer
public class Initializer {
private ConsumerHandler consumerHandlerA;
private ConsumerHandler consumerHandlerB;
@PostConstruct
public void init() {
consumerHandlerA = new ConsumerHandler (new ConsumerA("consumerA", getConsumerPropsA()), 3);
consumerHandlerB = new ConsumerHandler (new ConsumerB("consumerB", getConsumerPropsB()), 3);
}
@PreDestroy
public void shutdown() {
consumerHandlerA.shutdown();
consumerHandlerB.shutdown();
}
}
Это правильный дизайн для такого рода проблем, когда я хочу запускать нескольких потребителей кафки на одной коробке? Сообщите мне, есть ли лучший и эффективный способ решить эту проблему. В общем, я буду использовать трех или четырех потребителей Kafka в одной коробке, и каждый потребитель может иметь свою собственную группу потребителей, если это необходимо.
Это Javadoc для KafkaConsumer, который я использую как у своего потребителя. И основы на этом article Я создал своего потребителя, просто я использовал абстрактный класс для его расширения. Искать «Взять все это вместе» в этой ссылке.
В документах упоминается, что потребители не являются потокобезопасными, но похоже, что мой код повторно использует один и тот же экземпляр пользователя для каждого потока в пуле.
public ConsumerHandler(Consumer consumer, int poolSize) {
this.executorServiceConsumer = Executors.newFixedThreadPool(poolSize);
this.consumer = consumer;
for (int i = 0; i < poolSize; i++) {
this.consumers.add(consumer);
executorServiceConsumer.submit(consumer);
}
}
Каков наилучший способ решить эту проблему безопасности потоков и по-прежнему достичь тех же возможностей?
Если это рабочий код, вопрос относится к http://codereview.stackexchange.com/ – jaco0646