Я новичок в компоненте camel-kafka. У меня есть все настройки для отправки и получения сообщений с сервера kafka с использованием компонента camel-kafka. Я использую подобный код, как указано ниже:отправьте подтверждение kafka в компоненте camel-kafka
from("kafka:localhost:9092?topic=test&groupId=testing&autoOffsetReset=earliest&consumersCount=1")
.process(new Processor() {
@Override
public void process(Exchange exchange)
throws Exception {
String messageKey = "";
if (exchange.getIn() != null) {
Message message = exchange.getIn();
Integer partitionId = (Integer) message
.getHeader(KafkaConstants.PARTITION);
String topicName = (String) message
.getHeader(KafkaConstants.TOPIC);
if (message.getHeader(KafkaConstants.KEY) != null)
messageKey = (String) message
.getHeader(KafkaConstants.KEY);
Object data = message.getBody();
System.out.println("topicName :: "
+ topicName + " partitionId :: "
+ partitionId + " messageKey :: "
+ messageKey + " message :: "
+ data + "\n");
/// I perform many other operations here like persist the object in DB etc.
}
}
});
Здесь проблема в том, что я не отправка любой подтверждени Кафке он получает такое же сообщение трижды с сервера. Мой вопрос в том, как я могу отправить подтверждение на kafka вручную? Я не нашел надлежащей документации в компоненте camel-kafka.
Вы можете поместить свое подтверждение в другую очередь, как только вы прочтете сообщение, которое kafta будет слушать, и тем самым ваше сообщение будет отправлено. –