0

Работа с Kafka (v2.11-0.10.1.0) -spark-streaming (v-2.0.1-bin-hadoop2.7).Как получить смещение потребителя Кафки?

У меня есть Kafka Producer и Spark-streaming потребитель для производства и потребления. Все работает нормально, пока я не остановлю потребителя (около 2 минут) и начните снова. Потребитель начинает и читает данные, абсолютно совершенные. Но я потерялся с данными за 2 минуты, когда потребитель отключился.

Kafka consumer/server.properties не изменился.

производитель Кафка со свойствами:

  Properties properties = new Properties(); 
      properties.put("bootstrap.servers", AppCoding.KAFKA_HOST); 
      properties.put("auto.create.topics.enable", true); 
      properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); 
      properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); 
      properties.put("retries", 1); 
      logger.info("Initializing Kafka Producer."); 
      Producer<String, String> producer = new KafkaProducer<>(properties); 
      producer.send(new ProducerRecord<String, String>(AppCoding.KAFKA_TOPIC, "", documentAsString)); 

Потребляя с помощью искровой потокового API, как:

 SparkConf sparkConf = new SparkConf().setMaster(args[4]).setAppName("Streaming"); 

     // Create the context with 60 seconds batch size 
     JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(60000 * 5)); 

     //input arguments:localhost:2181 sparkS incoming 10 local[*] 

     Set<String> topicsSet = new HashSet<>(Arrays.asList(args[2].split(";"))); 
     Map<String, String> kafkaParams = new HashMap<>(); 
     kafkaParams.put("metadata.broker.list", args[0]); 
     //input arguments: localhost:9092 "" incoming 10 local[*] 

     JavaPairInputDStream<String, String> kafkaStream = 
       KafkaUtils.createDirectStream(jssc, 
         String.class, 
         String.class, 
         StringDecoder.class, 
         StringDecoder.class, 
         kafkaParams, 
         topicsSet); 

На другом конце я использую ActiveMQ. В то время как ActiveMQ Consumer мог получать данные во время их отключения. Помогите мне, если есть проблема с запутанностью.

ответ

0

В Kafka потребители фактически не имеют прямых отношений с производителями. У каждого потребителя есть смещение, которое отслеживает то, что было потреблено в разделах. Если у потребителя нет отслеживаемого смещения, Kafka автоматически сбросит его смещение на самое большое из-за значения по умолчанию config 'auto.offset.reset'. В вашем случае, когда новый потребитель запускается из-за политики по умолчанию, он не видит сообщения, созданные ранее. Вы можете установить auto.offset.reset раньше всех (для нового потребителя) или самого маленького (для старого потребителя).

+0

Спасибо. Нет такой величины смещения, как раньше. Я должен использовать только малый/самый большой. Самый маленький: - дает все данные каждый раз, когда я запускаю потребитель. В результате программа снова и снова будет выполнять одни и те же данные. Самый большой: - в любом случае дает самое последнее значение по умолчанию. Я ищу подход, который потребитель мог бы прочитать, откуда он ушел. – srikanth

+0

Потребитель прочитает, откуда он ушел автоматически. Config 'auto.offset.reset' предназначен для случаев отсутствия смещения инициализации или смещения вне диапазона. – amethystic

0

Kafka поддерживает смещение на каждую запись на каждую запись. В то время как потребитель отключился на 2-минутную продолжительность, значение смещения будет храниться в метаданных темы для нового потребителя, и снова, когда потребитель будет возвращен через 2 минуты, он будет читать последнее смещение, которое было сохранено в теме kafka.

Я думаю, что вам нужно проверить kafka broker data retention policy, если он меньше 2 минут, данные будут потеряны, если данных, соответствующих смещению, нет, он начнет считывать из последних, поскольку значение по умолчанию установлено на последнее значение auto.offset.reset=latest для поступления новых данных.

Я бы предложил проверить и изменить политику хранения данных кафки соответственно, если она меньше 2 минут