0

Моей искры потоковой работы потребляющих данных от КафкиКафки потреблять только новые сообщения

KafkaUtils.createStream(jssc, prop.getProperty(Config.ZOOKEEPER_QUORUM), 
         prop.getProperty(Config.KAFKA_CONSUMER_GROUP), topicMap); 

всякого раза, когда я перезапустить свою работу она начать потребляющую от последнего смещения магазина (я предполагаю, что это потому, что это занимает много времени, чтобы отправьте обработанные данные, и если я изменю группу потребителей, она мгновенно работает с новым сообщением)

Я kafka 8.1.1, где auto.offset.reset по умолчанию является самым большим, что означает, что когда я перезапущу kafka, вы отправите данные откуда Я ушел.

В моем случае использования прошу меня игнорировать эти данные и обрабатывать только поступающие данные. Как я могу достичь этого? любое предложение

ответ

2

Существует два способа добиться этого:

  1. создать уникальную потребительскую группу каждый раз при перезапуске, и он будет потреблять от последнего смещения.

  2. Используйте прямой подход вместо приемника; здесь у вас больше контроля над тем, как вы потребляете, но вам придется обновлять zookeeper вручную, чтобы сохранить ваши смещения. В приведенном ниже примере он всегда будет начинаться с последнего смещения.

    import org.apache.spark.streaming.kafka._ 
    val topicsSet = topics.split(",").toSet 
    val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers) 
    val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet) 
    

Документация по прямой подход здесь: https://spark.apache.org/docs/latest/streaming-kafka-integration.html

+0

я найти еще один подход Спарк 1.5 (я проверял), использовать Кафка прямой апи и не использовать контрольно-пропускные пункты. –