0

Я запускаю потоковое задание в Spark 2, CDH 5.9 с использованием клиента Kafka 0.8. Простой целью является сохранение информации в Импале, запись по записи.Исправлена ​​ошибка, связанная с ошибкой Kafka из-за InvalidClassException

Я не могу избавиться от этой ошибки, так как я не знаю, откуда она исходит от:

16/12/14 08:43:28 ERROR scheduler.JobScheduler: Error running job streaming 
job 1481726608000 ms.0 
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in 
stage 25.0 failed 4 times, most recent failure: Lost task 0.3 in stage 25.0 
(TID 132, datanode1, executor 1): 
java.io.InvalidClassException: org.apache.commons.lang3.time.FastDateFormat; 
local class incompatible: stream classdesc serialVersionUID = 1, 
local class serialVersionUID = 2 

Прямой Кафка поток создается просто

val streamingContext = new StreamingContext(spark.sparkContext, Seconds(2)) 
val kafkaParams = Map[String, String](
    "bootstrap.servers" -> "datanode1:9092,datanode2:9092,datanode3:9092", 
    "group.id" -> "myconsumergroup", 
    "auto.offset.reset" -> "largest") 
val topics:Set[String] = Set("kafkatest") 
val directKafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder] (streamingContext, kafkaParams, topics) 

и обработаны по:

val deviceMap = spark.read.parquet("/user/admin/temp/joinData.parquet").cache() 

directKafkaStream.foreachRDD { rdd => 
    val avgData = spark.read.schema(jsonDatastruct).json(rdd.map(i => i._2)).select("data.*").as[JsonInfo] 

    val deviceEnriched = avgData.join(deviceMap,Seq("COMMON_KEY"),"left") 

    deviceEnriched.show(false) 
    spark.sql("use my_database") 
     deviceEnriched.write.mode("append").saveAsTable("tbl_persisted_kafka_stream") 
} 

streamingContext.start() 
streamingContext.awaitTermination() 

ответ

2

Короткий ответ: сообщения были сериализовать с версией commons-lang3 JAR, который несовместим с JAR, который вы используете с Spark.

Длинного ответ: если вы только Гугл, что сообщение об ошибке, а затем обыскала исходный код Apache Commons, вы нашли бы ...

  • this post что зарывается в Java вопрос «класс несовместимой» сериализация, в общем случае
  • исходный код для FastDateFormat о том, что до тех пор, пока serialVersionUID = 1LV3.1, но переход на serialVersionUID = 2L с V3.2 (потому что бинарная структура изменилась в то время)

Кстати, я только что проверил и CDH 5.9 поставляется с commons-lang3 в V3.1 (для улья, Impala, Sentry, Улей-в-Oozie, Sqoop-в-Oozie) и V3.3.2 (для Спарк -in-Oozie) и V3.4 (для Sqoop), в то время как сама искры вовсе не нуждается в этом. Идите фигуру.
И поскольку CDH не поставляется с искрой 2 все же, я думаю, вы либо загрузили «бета» посылку или версию Apache - и я проверил, версию Apache (2.0.2) судов с commons-lang3V3.3.2

Мои 2 цента: просто введите --jars /opt/cloudera/parcels/CDH/jars/commons-lang3-3.1.jar в свою командную строку Spark 2 и посмотрите, достаточно ли этого, чтобы решить вашу проблему.

Редактировать   За 2 дополнительных центов, убедитесь, что ваш «заказ» JAR получает приоритет над любой JAR уже в пряже Classpath с --conf spark.yarn.user.classpath.first=true

+0

Спасибо Самсона. Это решило проблему :) BTW, посылка Spark 2 была выпущена GA на этой неделе из Cloudera и поставляется с ** V3.3.2 **. Как вы правильно сказали: Идите фигуру. Моя основная проблема заключалась в том, что я не мог понять, какой объект был сериализован и откуда и где, но заставив v3.1, как вы указали, решила проблему. –

+0

... некоторое время. Исключение возвращается снова, и не имеет значения, включает ли он ** V3.1 ** или ** V3.3.2 **, исключение всегда одно и то же и в одном и том же узле (Im запускает это на трех узлах). Поэтому я думаю, что это может быть что-то связанное с Спарком, но не связанное с моей работой? Любые другие идеи? –

+0

Остановка этого узла решает проблему, поэтому я предполагаю, что в этом узле есть устаревшая конфигурация. Есть ли способ обновить его? Поскольку это виртуальная машина, у меня возникает соблазн создать ее снова с нуля. –