2016-06-14 1 views
1

Сценарий, который я хочу реализовать, - это сообщение от Kafka, обрабатывающее его, если какое-то условие не работает, я не хочу признавать сообщение. Для этого я нашел в справочной документации по потоку весеннего облака,Руководство Подтверждение сообщений: Spring Cloud Stream Kafka

autoCommitOffset Независимо от того, следует ли автокомментировать смещения, когда сообщение обработано. Если установлено значение false, заголовок подтверждения будет доступен в заголовках сообщений для позднего подтверждения.

По умолчанию: true.

Мой вопрос после установки autoCommitOffset на false, как я могу подтвердить сообщение? Пример кода был бы чрезвычайно оценен.

+0

Я не уверен, что " Sprint Cloud Stream Kafka "есть. Однако, если вы используете обычный клиент Kafka, он должен предоставить метод, называемый «commit()» или аналогичный для совершения смещений вручную. –

ответ

3

Я дала ответ на вопрос здесь https://github.com/spring-cloud/spring-cloud-stream/issues/575

По существу сводится к установке spring.cloud.stream.kafka.bindings.input.consumer.autoCommitOffset=false

, а затем обработки заголовка подтверждения:

@SpringBootApplication 
@EnableBinding(Sink.class) 
    public class ManuallyAcknowdledgingConsumer { 

     public static void main(String[] args) { 
     SpringApplication.run(ManuallyAcknowdledgingConsumer.class, args); 
     } 

     @StreamListener(Sink.INPUT) 
     public void process(Message<?> message) { 
     System.out.println(message.getPayload()); 
     Acknowledgment acknowledgment = message.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class); 
     if (acknowledgment != null) { 
      System.out.println("Acknowledgment provided"); 
      acknowledgment.acknowledge(); 
     } 
    } 
} 
+0

Спасибо, Мариус. Также, пожалуйста, сообщайте операторам импорта, чтобы узнать API класса подтверждения. Хотелось посмотреть, есть ли способ не принимать сообщения. –

+0

Это ответ. Спасибо, Мариус! –