Поскольку я новичок в kafka, моя группа использует Spring-Integration-kafka: 2.0.0.RELEASE в приложении Spring-boot. Я смог использовать kafka сообщение с моим KafkaConsumer на основе этого примера here.Невозможно отключить ручные комманды в сообщении Kafka с помощью Spring-integration-kafka в приложении Spring-Boot
Для этого пружинного загрузки приложения, у меня есть Application.java
package hello;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import java.util.concurrent.TimeUnit;
import hello.notification.Listener;
@SpringBootApplication
public class Application {
private Listener listener;
public void run(String... args) throws Exception {
this.listener.countDownLatch1.await(60, TimeUnit.SECONDS);
}
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
}
Это мой KafkaConsumerConfig.java
package hello.notification;
import ....; //different imports
@Configuration
@EnableKafka
public class KafkaConsumerConfig {
@Bean
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(1);
factory.getContainerProperties().setPollTimeout(3000);
return factory;
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> propsMap = new HashMap<>();
propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, "group1");
propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
//propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 100);
propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 6000);
propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(propsMap);
}
@Bean
public Listener listener() {
return new Listener();
}
}
и, наконец, мой Listener.java
package hello.notification;
import ...; // different imports
public class Listener {
public final CountDownLatch countDownLatch1 = new CountDownLatch(1);
@KafkaListener(topics = "topic1")
public void listen(ConsumerRecord<?, ?> record, Acknowledgment ack) {
System.out.println(record + " and " + ack);
// Place holder for "ack.acknowledge();"
countDownLatch1.countDown();
}
}
мои вопросы:
1) В настройках ConsumerConfig я уже установил «ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG» на false и прокомментировал «ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG», почему он все еще совершает автоматическую передачу? (Поскольку я больше не увижу сообщение, если я перезапущу мое приложение с пружинным загрузочным устройством - я бы ожидал, что он попытается выполнить повторную передачу до тех пор, пока это не будет совершено)
2) Чтобы совершить ручную операцию, мне кажется, мне просто нужно добавить ack.acknowledge() внутри моего Listener.listen (пожалуйста см. место-держатель)? Есть ли что-нибудь еще, что мне понадобится, кроме этого, чтобы это было ручной фиксацией?
3) Я хотел бы иметь самый простой/чистый kafkaconsumer, интересно, думаете ли вы, что у меня есть самый простой способ, изначально я искал однопотокового потребителя, но примеров не так много, поэтому я придерживаюсь с одновременным слушателем.
Спасибо за вашу помощь и ввод!
Спасибо за обратную связь. из того, что вы сказали, это похоже на единственное изменение, которое мне нужно сделать для выполнения ручной фиксации, находится в файле «KafkaConsumerConfig.java». Я попытался добавить "factory.getContainerProperties(). SetAckMode (AckMode.MANUAL_IMMEDIATE);" прямо перед тем, как я верну свою фабрику и повторил, но это не сработало, при перезапуске мое сообщение все еще не появляется.Могу ли я спросить, можете ли вы предоставить образец, используя мой код? Благодаря! –
Ну, попробуйте добавить 'propsMap.put (ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,« самое раннее »),' to 'consumerFactory'. По умолчанию это 'last': https://kafka.apache.org/documentation/#consumerconfigs. –
Я добавил propsMap.put (ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, «самое раннее»); а также в классе Listener мы обеспечили, что мы реализуем AcknowledgeingMessageListener и переопределяем метод onMessage, но все же не везет с возвратом сообщения, он чувствует, что он автоматически совершает сам по себе до конца onMessage(): Открытый класс Listener реализует AcknowledgingMessageListener { @Override public void onMessage (ConsumerRecord , ?> record, Acknowledgement ack) { System.out.println (запись + "и" + ack); } } –