Я хотел бы получить ответ от брокера, когда я создаю сообщение. Я пробовал механизм CallBack (путем реализации CallBack), используемый в KafkaProducer.send
, но он не работал и не вызывает метод onCompletion
.
Когда я выключаю сервер Kafka и пытаюсь создать сообщение, он вызывает метод обратного вызова.Как получить подтверждение от брокера Kafka, если сообщение создано продюсером?
Есть ли другой способ получить подтверждение?
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
long elapsedTime = System.currentTimeMillis() - startTime;
System.out.println("Called Callback method");
if (metadata != null) {
System.out.println("message(" + key + ", " + message
+ ") sent to partition(" + metadata.partition() + "), "
+ "offset(" + metadata.offset() + ") in " + elapsedTime
+ " ms");
} else {
exception.printStackTrace();
}
}
props.put("bootstrap.servers", "localhost:9092");
props.put("client.id", "mytopic");
props.put("key.serializer", org.apache.kafka.common.serialization.StringSerializer.class);
props.put("value.serializer", org.apache.kafka.common.serialization.ByteArraySerializer.class);
KafkaProducer<String, byte[]> producer = new KafkaProducer<String,byte[]>(props);
long runtime = new Date().getTime();
String ip = "192.168.2."+ rnd.nextInt(255);
String msg = runtime + ".www.ppop.com," + ip;
producer.send(new ProducerRecord<String, byte[]>("mytopic", msg.getBytes()), `new TransCallBack(Calendar.getInstance().getTimeInMillis(), key, msg));`
Я использую kafka-client api 0.9.1 с брокером версии 0.8.2.
Можете ли вы показать код, который вы используете для реализации CallBack. Если этот метод не вызывается, то проблема должна быть где-то в другом месте. – morganw09dev
Я только что обновил свой брокер до 0.9.0.0, и он начал работать. Теперь вопрос будет в том, что новый api не вызовет Callback на версии 0.8.2.x? – usman
См. Мой ответ, но снова. Покажите нам код, который вы используете для отправки данных в Kafka, вероятно, это источник вашей проблемы. Также не рекомендуется использовать разные версии Kafka. – morganw09dev