3

Я хотел бы получить ответ от брокера, когда я создаю сообщение. Я пробовал механизм CallBack (путем реализации CallBack), используемый в KafkaProducer.send, но он не работал и не вызывает метод onCompletion.

Когда я выключаю сервер Kafka и пытаюсь создать сообщение, он вызывает метод обратного вызова.Как получить подтверждение от брокера Kafka, если сообщение создано продюсером?

Есть ли другой способ получить подтверждение?

@Override 
    public void onCompletion(RecordMetadata metadata, Exception exception) { 
     long elapsedTime = System.currentTimeMillis() - startTime; 
     System.out.println("Called Callback method"); 
     if (metadata != null) { 
      System.out.println("message(" + key + ", " + message 
        + ") sent to partition(" + metadata.partition() + "), " 
        + "offset(" + metadata.offset() + ") in " + elapsedTime 
        + " ms"); 
     } else { 
      exception.printStackTrace(); 
     } 

    } 

props.put("bootstrap.servers", "localhost:9092"); 
props.put("client.id", "mytopic"); 
props.put("key.serializer", org.apache.kafka.common.serialization.StringSerializer.class); 
props.put("value.serializer", org.apache.kafka.common.serialization.ByteArraySerializer.class); 

KafkaProducer<String, byte[]> producer = new KafkaProducer<String,byte[]>(props); 
long runtime = new Date().getTime(); 
String ip = "192.168.2."+ rnd.nextInt(255); 
String msg = runtime + ".www.ppop.com," + ip; 
producer.send(new ProducerRecord<String, byte[]>("mytopic", msg.getBytes()), `new TransCallBack(Calendar.getInstance().getTimeInMillis(), key, msg));` 

Я использую kafka-client api 0.9.1 с брокером версии 0.8.2.

+0

Можете ли вы показать код, который вы используете для реализации CallBack. Если этот метод не вызывается, то проблема должна быть где-то в другом месте. – morganw09dev

+0

Я только что обновил свой брокер до 0.9.0.0, и он начал работать. Теперь вопрос будет в том, что новый api не вызовет Callback на версии 0.8.2.x? – usman

+0

См. Мой ответ, но снова. Покажите нам код, который вы используете для отправки данных в Kafka, вероятно, это источник вашей проблемы. Также не рекомендуется использовать разные версии Kafka. – morganw09dev

ответ

2

Так что я не уверен на 100%, с какими версиями работают с которыми в Кафке. В настоящее время я использую 0,8,2, я знаю, что 0,9 внес некоторые изменения, но я не мог точно сказать, что теперь работает/не работает.

Одна очень сильная рекомендация, я бы использовал версию Kafka-Client, соответствующую вашей версии брокера. Если вы используете брокер 0.8.2, я бы использовал kakfa-client 0.8.2.

Вы никогда не представляли никакого кода того, как вы используете это, поэтому я просто немного угадываю в темноте. Но я реализовал функцию обратного вызова в Kafka 0.8.2, используя this method в производителе. Ниже приведена подпись метода.

public java.util.concurrent.Future<RecordMetadata> send(ProducerRecord<K,V> record, Callback callback) 

Где я буду называть этот метод, я действительно перехожу в класс с помощью метода overriden.

KafkaProducer<String, String> prod = new KafkaProducer<String, String>(props); 
ProducerRecord<String, String> record = //data to send to kafka 
prod.send(record, new Callback() { 
    @Override 
    public void onCompletion(RecordMetadata metadata, Exception e) { 
    if (e != null) { 
     e.printStackTrace(); 
    } else { 
     //implement logic here, or call another method to process metadata 
     System.out.println("Callback"); 
    } 
    } 
}); 

Я предполагаю, что есть способ сделать это так же, как вы это сделали. Но вам нужно будет предоставить код, показывающий, как вы отправляете записи в Kafka. Кроме этого, я просто догадываюсь.

3

Существует простой способ получить информацию от брокера после публикации сообщения KafkaProducer с использованием версии kafka 0.9. Вы можете позвонить получить() метод, который возвращает объект RecordMetadata который вы можете получить информацию, такие как смещение, topicPartition ниже фрагмента кода в качестве примера:

RecordMetadata m = kafkaProducer.send(new ProducerRecord<byte[], byte[]>(
         topic, key.getBytes("UTF-8"), message 
           .getBytes("UTF-8"))).get(); 
System.out.println("Message produced, offset: " + m.offset()); 
System.out.println("Message produced, partition : " + m.partition()); 
System.out.println("Message produced, topic: " + m.topic()); 
+3

get() резко снижает производительность по сравнению с асинхронным – chro

2

Закрыть производитель - как только вы сделали все публикации ваши сообщения - как это:

producer.close(); 

Это гарантирует, что следующий метод от обратного вызова всегда вызывается:

onCompletion(RecordMetadata metadata, Exception exception) 

NB: Я испытал это, добавив, что линия т o this sample Producer class и он работает.