2016-11-24 7 views
4

Я уверен, что я нажимаю только строки данных и десериализую также как String. Запись, которую я нажал, также отображается с ошибкой.Объект не serializable (org.apache.kafka.clients.consumer.ConsumerRecord) в Java spark kafka streaming

Но почему внезапно появляется такой тип ошибок, есть ли что-нибудь, что у меня отсутствует?

Вот ниже код,

import java.util.HashMap; 
    import java.util.HashSet; 
    import java.util.Arrays; 
    import java.util.Collection; 
    import java.util.Iterator; 
    import java.util.Map; 
    import java.util.Set; 
    import java.util.concurrent.atomic.AtomicReference; 
    import java.util.regex.Pattern; 

    import scala.Tuple2; 

    import kafka.serializer.StringDecoder; 

    import org.apache.spark.SparkConf; 
    import org.apache.spark.api.java.JavaPairRDD; 
    import org.apache.spark.api.java.JavaRDD; 
    import org.apache.spark.api.java.function.*; 
    import org.apache.spark.streaming.api.java.*; 
    import org.apache.spark.streaming.kafka.HasOffsetRanges; 
    import org.apache.spark.streaming.kafka10.*; 
    import org.apache.spark.streaming.kafka.OffsetRange; 
    import org.apache.spark.streaming.Duration; 
    import org.apache.spark.streaming.Durations; 

public final class KafkaConsumerDirectStream { 
    public static void main(String[] args) throws Exception { 
     try { 
        SparkConf sparkConf = new SparkConf().setAppName("JavaKafkaWordCount11").setMaster("local[*]"); 
        sparkConf.set("spark.streaming.concurrentJobs", "3"); 

        // Create the context with 2 seconds batch size 
        JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(2000)); 

        Map<String, Object> kafkaParams = new HashMap<>(); 
        // kafkaParams.put("metadata.broker.list", "x.xx.xxx.xxx:9091, 
        // x.xx.xxx.xxx:9092, x.xx.xxx.xxx:9093"); 

        kafkaParams.put("bootstrap.servers", "x.xx.xxx.xxx:9091"); 
        kafkaParams.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 
        kafkaParams.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 
        kafkaParams.put("group.id", "11_ubiq_12dj"); 
        kafkaParams.put("enable.auto.commit", "true"); 
        kafkaParams.put("auto.commit.interval.ms", "1000"); 
        kafkaParams.put("session.timeout.ms", "30000"); 
        kafkaParams.put("auto.offset.reset", "earliest"); 
        kafkaParams.put("enable.auto.commit", true); 

        Collection<String> topics = Arrays.asList("TopicQueue"); 

        JavaInputDStream<ConsumerRecord<String, String>> stream = KafkaUtils.createDirectStream(jssc, 
          LocationStrategies.PreferBrokers(), 
          ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams)); 

        //stream.print(); 


        stream.foreachRDD(new VoidFunction<JavaRDD<ConsumerRecord<String, String>>>() { 
         @Override 
         public void call(JavaRDD<ConsumerRecord<String, String>> rdd) { 
          final OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges(); 
          rdd.foreachPartition(new VoidFunction<Iterator<ConsumerRecord<String, String>>>() { 
           @Override 
           public void call(Iterator<ConsumerRecord<String, String>> consumerRecords) { 
            OffsetRange o = offsetRanges[TaskContext.get().partitionId()]; 

            // stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges); 
            System.out.println(
              o.topic() + " " + o.partition() + " " + o.fromOffset() + " " + o.untilOffset()); 

           } 
          }); 
         } 
        }); 

        jssc.start(); 
        jssc.awaitTermination(); 
       } catch (Exception e) { 
        e.printStackTrace(); 
       }  
    } 
} 

Ниже ошибки воспитывающих,

16/11/24 00:19:14 ERROR JobScheduler: Error running job streaming job 1479964754000 ms.0 
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0.0 in stage 30.0 (TID 1500) had a not serializable result: org.apache.kafka.clients.consumer.ConsumerRecord 
Serialization stack: 
    - object not serializable (class: org.apache.kafka.clients.consumer.ConsumerRecord, value: ConsumerRecord(topic = PartWithTopic02Queue, partition = 36, offset = 555, CreateTime = 1479964753779, checksum = 2582644462, serialized key size = -1, serialized value size = 6, key = null, value = Hello0)) 
    - element of array (index: 0) 
    - array (class [Lorg.apache.kafka.clients.consumer.ConsumerRecord;, size 1) 
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1450) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1438) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1437) 
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) 
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1437) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811) 
    at scala.Option.foreach(Option.scala:236) 
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:811) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1659) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1618) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1607) 
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) 
    at java.lang.Thread.getStackTrace(Thread.java:1117) 
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:632) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1871) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1884) 
    at org.apache.spark.streaming.kafka010.KafkaRDD.take(KafkaRDD.scala:122) 
    at org.apache.spark.streaming.kafka010.KafkaRDD.take(KafkaRDD.scala:50) 
    at org.apache.spark.streaming.dstream.DStream$$anonfun$print$2$$anonfun$foreachFunc$3$1.apply(DStream.scala:734) 
    at org.apache.spark.streaming.dstream.DStream$$anonfun$print$2$$anonfun$foreachFunc$3$1.apply(DStream.scala:733) 
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51) 
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51) 
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51) 
    at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415) 
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:50) 
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50) 
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50) 
    at scala.util.Try$.apply(Try.scala:161) 
    at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39) 
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:245) 
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:245) 
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:245) 
    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) 
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:244) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1153) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) 
    at java.lang.Thread.run(Thread.java:785) 
+0

Пожалуйста, игнорируйте комментировал код –

+0

Exception повышения выполнения –

ответ

1

Кажется org.apache.spark.streaming.kafka10.*; не работает хорошо. Я использовал только org.apache.spark.streaming.kafka Он отлично работает для меня.

1

apache.kafka.clients.consumer.ConsumerRecord класс не сериализации, которые не могут быть использованы для RMI или как.

+0

До этого он работал очень хорошо. Могут быть некоторые изменения, которые я сделал, поэтому он не работает ... –

+0

@CodeIdenti, прежде чем вы это понимаете? возможно, вы сериализовали нулевой объект? – AntJavaDev

+0

Если в то время в «TopicQueue» нет данных, он работает нормально. Но когда данные, доступные в «TopicQueue», получают эту ошибку. –

0

вам просто нужно добавить public final class KafkaConsumerDirectStream implements java.io.Serializable это работа для меня, хотя использование org.apache.spark.streaming.kafka10.*

надеюсь, что вы можете помочь, спасибо :-)

 Смежные вопросы

  • Нет связанных вопросов^_^