2016-12-15 1 views
0

Я пишу пакетное задание, чтобы воспроизвести события из Кафки. Кафка v. 0.10.1.0 и искра 1.6.Создание JavaPairRDD с использованием KafkaUtils.createRDD (искра и кафка)

Я пытаюсь использовать JavaPairRDD javaPairRDD = KafkaUtils.createRDD (...) вызов:

Properties configProperties = new Properties(); 
configProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "10.4.1.194:9092"); 
configProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); 
configProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); 
     org.apache.kafka.clients.producer.Producer producer = new KafkaProducer(configProperties); 
for (String topic : topicNames) { 
      List<PartitionInfo> partitionInfos = producer.partitionsFor(topic); 
    for (PartitionInfo partitionInfo : partitionInfos) { 
       log.debug("partition leader id: {}", partitionInfo.leader().id()); 
     JavaSparkContext sparkContext = new JavaSparkContext(sparkConf); 
     Map<String, String> kafkaParams = new HashMap(); 
     kafkaParams.put("metadata.broker.list", "10.4.1.194:9092"); 
     kafkaParams.put("zookeeper.connect", "10.4.1.194:2181"); 
     kafkaParams.put("group.id", "kafka-replay"); 
     OffsetRange[] offsetRanges = new OffsetRange[]{OffsetRange.create(topic, partitionInfo.partition(), 0, Long.MAX_VALUE)}; 

     JavaPairRDD<String, String> javaPairRDD = KafkaUtils.createRDD(
         sparkContext, 
         String.class, 
         String.class, 
         StringDecoder.class, 
         StringDecoder.class, 
         kafkaParams, 
         offsetRanges); 

     javaPairRDD 
       .map(t -> getInstrEvent(t._2)) 
       .filter(ie -> startTimestamp <= ie.getTimestamp() && ie.getTimestamp() <= endTimestamp) 
       .foreach(s -> System.out.println(s)); 
    } 
} 

Однако он терпит неудачу с ошибкой:

2016-12-14 15:45:44,700 [main] ERROR  com.goldenrat.analytics.KafkaToHdfsReplayMain - error 
org.apache.spark.SparkException: Offsets not available on leader:  OffsetRange(topic: 'sfs_create_room', partition: 0, range: [1 -> 100]) 
    at  org.apache.spark.streaming.kafka.KafkaUtils$.org$apache$spark$streaming$kaf ka$KafkaUtils$$checkOffsets(KafkaUtils.scala:200) 
    at org.apache.spark.streaming.kafka.KafkaUtils$$anonfun$createRDD$1.apply(KafkaUtils.scala:253) 
    at org.apache.spark.streaming.kafka.KafkaUtils$$anonfun$createRDD$1.apply(KafkaUtils.scala:249) 
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) 
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111) 
    at org.apache.spark.SparkContext.withScope(SparkContext.scala:714) 
    at org.apache.spark.streaming.kafka.KafkaUtils$.createRDD(KafkaUtils.scala:249) 
    at org.apache.spark.streaming.kafka.KafkaUtils$$anonfun$createRDD$3.apply(KafkaUtils.scala:338) 
    at org.apache.spark.streaming.kafka.KafkaUtils$$anonfun$createRDD$3.apply(KafkaUtils.scala:333) 
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) 
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111) 
    at org.apache.spark.SparkContext.withScope(SparkContext.scala:714) 
    at org.apache.spark.streaming.kafka.KafkaUtils$.createRDD(KafkaUtils.scala:333) 
    at org.apache.spark.streaming.kafka.KafkaUtils.createRDD(KafkaUtils.scala) 
    at com.goldenrat.analytics.KafkaToHdfsReplayMain$KafkaToHdfsReplayJob.start(KafkaToHdfsReplayMain.java:172) 

я могу использовать другие клиенты чтобы связаться с брокером и получить сообщения, поэтому я знаю, что это не брокер. Любая помощь?

ответ

0

Похоже, вы не можете указать несуществующее смещение для вашего диапазона. Я надеялся, что смогу получить все смещения, указав 0 до Long.MAX_VALUE, но это не удастся, если смещение недействительно с этим сообщением об ошибке. Если я укажу допустимое смещение (мин/макс) для диапазона, оно работает. Для всех, кто наткнулся на это, вы можете получить их с чем-то вроде:

 Properties configProperties = new Properties(); 
     configProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "10.4.1.194:9092"); 
     configProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); 
     configProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); 
     org.apache.kafka.clients.producer.Producer producer = new KafkaProducer(configProperties); 
     for (String topic : topicNames) { 
      offsets.get(topic).getMinimum(), offsets.get(topic).getMaximum()); 
      log.debug("doing topic: {}", topic); 
      List<PartitionInfo> partitionInfos = producer.partitionsFor(topic); 
      for (PartitionInfo partitionInfo : partitionInfos) { 

       TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partitionInfo.partition()); 
       Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<>(); 
       SimpleConsumer consumer = new SimpleConsumer("10.4.1.194", 9092, 10000, 64 * 1024, "kafka-replay"); 

       requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(kafka.api.OffsetRequest.EarliestTime(), 1)); 
       kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(), "kafka-replay"); 
       OffsetResponse response = consumer.getOffsetsBefore(request); 
       if (response.hasError()) { 
        log.error("error, " + response.errorCode(topic, partitionInfo.partition())); 
       } 
       long[] earliestOffsetsArray = response.offsets(topic, partitionInfo.partition()); 


       requestInfo = new HashMap<>(); 
       requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(kafka.api.OffsetRequest.LatestTime(), 1)); 
       request = new kafka.javaapi.OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(), "kafka-replay"); 

       response = consumer.getOffsetsBefore(request); 
       if (response.hasError()) { 
        log.error("error, " + response.errorCode(topic, partitionInfo.partition())); 
       } 
       long[] latestOffsetsArray = response.offsets(topic, partitionInfo.partition()); 

       ...