2016-12-15 3 views
0

Я снова здесь, я пытаюсь прочитать данные из темы kafka_0.9.0.0 с помощью spark streaming_1. 6.1 класс, написанный на scala -2.10.5. Его простая программа я построил в sbt_0.13.12. Когда я запускаю программу я получаю это исключениеjava.lang.ClassCastException: [B не может быть передан в java.lang.String при разборе json [String, String]

(вводный основные 0) org.apache.spark.SparkException: Работа прервана из стадии недостаточности: Задача 0 в стадии 1,0 не удалось 1 раз, самый последний сбой: Lost Задача 0.0 на этапе 1.0 (TID 1, localhost): java.lang.ClassCastException: [B нельзя отбрасывать в java.lang.String [ошибка] на org.kafka.receiver.AvroCons $ $ anonfun $ 1.Apply (AvroConsumer.scala: 54) [ошибка] в org.kafka.receiver.AvroCons $$ anonfun $ 1.Apply (AvroConsumer.scala: 54) [ошибка] в scala.collection.Iterato r $$ anon $ 11.next (Iterator.scala: 328) [ошибка]
at org.apache.spark.util.Utils $ .getIteratorSize (Utils.scala: 1597) [ошибка] на org.apache.spark .rdd.RDD $$ anonfun $ count $ 1.apply (RDD.scala: 1157) [ошибка] на org.apache.spark.rdd.RDD $$ anonfun $ count $ 1.apply (RDD.scala: 1157) [ошибка] на org.apache.spark.SparkContext $$ anonfun $ runJob $ 5.apply (SparkContext.scala: 1858) [ошибка] на org.apache.spark.SparkContext $$ anonfun $ runJob $ 5.apply (SparkContext .scala: 1858) [ошибка] на org.apache.spark.scheduler.ResultTask.runTask (ResultTask.scala: 66) [ошибка] на org.apache.spark.scheduler.Task.run (Task.scala : 89) [ошибка] в org.apache.spark.executor.Executor $ TaskRunner.run (Executor.scala: 214) [ошибки] в java.util.concurrent.ThreadPoolExecutor.runWorker (ThreadPoolExecutor.java:1145) [ошибка] на java.util.concurrent.ThreadPoolExecutor $ Worker.run (ThreadPoolExecutor.java:615) [ошибка] на java.lang.Thread.run (Thread.java:745) [ошибка] [ошибка ] Driver stacktrace: org.apache.spark.SparkException: Job прервано из-за отказа этапа: Задача 0 на этапе 1.0 не удалась 1 раз, большинство недавний сбой: потерянная задача 0.0 в стадии 1.0 (TID 1, localhost): java. lang.ClassCastException: [B нельзя отбрасывать в java.lang.String

Вот программа Scala,

1 package org.kafka.receiver 
     2 case class mobileData(action: String, tenantid: Int, lat: Float, lon: Float, memberid: Int, event_name: String, productUpccd: Int, device_type: String, device_os_ver: Float, item_na  me: String) 
     3 import java.util.HashMap 
     4 import org.apache.avro.SchemaBuilder 
     5 import org.apache.spark.SparkConf 
     6 import org.apache.spark.SparkContext 
     7 import org.apache.spark.serializer.KryoSerializer 
     8 import org.apache.spark.storage.StorageLevel 
     9 import org.apache.spark.streaming.Seconds 
    10 import org.apache.spark.streaming.StreamingContext 
    11 import org.apache.spark.streaming.StreamingContext._ 
    12 import org.apache.spark.SparkContext._ 
    13 import org.apache.spark.streaming.dstream.ReceiverInputDStream 
    14 import org.apache.spark.streaming.kafka.KafkaUtils 
    15 import kafka.serializer.DefaultDecoder 
    16 import org.apache.spark.sql.SQLContext 
    17 import com.sun.istack.internal.logging.Logger 
    18 object AvroCons { 
    19 val eventSchema = SchemaBuilder.record("eventRecord").fields 
    20  .name("action").`type`().stringType().noDefault() 
    21  .name("tenantid").`type`().intType().noDefault() 
    22  .name("lat").`type`().doubleType().noDefault() 
    23  .name("lon").`type`().doubleType().noDefault() 
    24  .name("memberid").`type`().intType().noDefault() 
    25  .name("event_name").`type`().stringType().noDefault() 
    26  .name("productUpccd").`type`().intType().noDefault() 
    27  .name("device_type").`type`().stringType().noDefault() 
    28  .name("device_os_ver").`type`().stringType().noDefault() 
    29  .name("item_name").`type`().stringType().noDefault().endRecord 
    30  def main(args: Array[String]): Unit = { 
    31 
    32  val sparkConf = new SparkConf().setAppName("Avro Consumer"). 
    33  set("spark.driver.allowMultipleContexts", "true").setMaster("local[2]") 
    34  sparkConf.set("spark.cores.max", "2") 
    35  sparkConf.set("spark.serializer", classOf[KryoSerializer].getName) 
    36  sparkConf.set("spark.sql.tungsten.enabled", "true") 
    37  sparkConf.set("spark.eventLog.enabled", "true") 
    38  sparkConf.set("spark.app.id", "KafkaConsumer") 
    39  sparkConf.set("spark.io.compression.codec", "snappy") 
    40  sparkConf.set("spark.rdd.compress", "true") 
    41  sparkConf.set("spark.streaming.backpressure.enabled", "true") 
    42  sparkConf.set("spark.sql.avro.compression.codec", "snappy") 
    43  sparkConf.set("spark.sql.avro.mergeSchema", "true") 
    44  sparkConf.set("spark.sql.avro.binaryAsString", "true") 
    45  val sc = new SparkContext(sparkConf) 
    46  sc.hadoopConfiguration.set("avro.enable.summary-metadata", "false") 
    47  val ssc = new StreamingContext(sc, Seconds(2)) 
    48  val kafkaConf = Map[String, String]("metadata.broker.list" -> "############:9092", 
    49   "zookeeper.connect" -> "#############", 
    50   "group.id" -> "KafkaConsumer", 
    51   "zookeeper.connection.timeout.ms" -> "1000000") 
    52  val topicMaps = Map("fishbowl" -> 1) 
    53  val messages = KafkaUtils.createStream[String, String,DefaultDecoder, DefaultDecoder](ssc, kafkaConf, topicMaps, StorageLevel.MEMORY_ONLY_SER) 

    54 messages.print()  
    55 val lines = messages.map(x=>x._2); lines.foreachRDD((rdd,time)=>{ 
    56 val count = rdd.count() 
    57 if(count>0) 
    58 rdd.foreach(record=>{println(record)})}) 
    59 
    60 ssc.start() 
    61  ssc.awaitTermination() 
    62  } 
    63 
    64 } 

А вот мой build.sbt

name := "AvroConsumer" 
version := "1.0" 
scalaVersion := "2.10.6" 
jarName in assembly := "AvroConsumer.jar" 

libraryDependencies += "org.apache.spark" % "spark-core_2.10" % "1.6.1" % "provided"  

libraryDependencies += "org.apache.spark" % "spark-sql_2.10" % "1.6.1" % "provided" 

libraryDependencies += "org.apache.spark" % "spark-streaming_2.10" % "1.6.1" 

libraryDependencies += "org.apache.spark" % "spark-streaming-kafka-assembly_2.10" % "1.6.1" 

libraryDependencies += "org.apache.spark" % "spark-streaming-kafka_2.10" % "1.6.1" 

libraryDependencies += "org.codehaus.jackson" % "jackson-mapper-asl" % "1.9.13" 

libraryDependencies += "org.openrdf.sesame" % "sesame-rio-api" % "2.7.2" 

libraryDependencies += "com.databricks" % "spark-csv_2.10" % "0.1" 

libraryDependencies += "org.apache.avro" % "avro" % "1.8.1" 

libraryDependencies += "log4j" % "log4j" % "1.2.17" 

libraryDependencies += "org.apache.avro" % "avro-tools" % "1.7.4" 

assemblyMergeStrategy in assembly := { case PathList("META-INF", xs @ 
_*) => MergeStrategy.discard case x => MergeStrategy.first } 

Я готовлю этот код, чтобы создать DF от темы Кафка, поэтому я имел для установки всех этих свойств в sparkConf(). Вот схема моих входящих данных,

{ 
    "action": "AppEvent", 
    "tenantid": 299, 
    "lat": 0.0, 
    "lon": 0.0, 
    "memberid": 16445, 
    "event_name": "CATEGORY_CLICK", 
    "productUpccd": 0, 
    "device_type": "iPhone", 
    "device_os_ver": "10.1", 
    "item_name": "CHICKEN" 
} 

А вот мой класс продюсер Кафки.

public class KafkaAvroProducer { 

    /* case class 
    TopicData("action":"AppEvent","tenantid":1173,"lat":0.0,"lon":0.0,"memberid":55, 
    "event_name":"CATEGORY_CLICK", 
    "productUpccd":0,"device_type":"iPhone","device_os_ver":"10.1","item_name":"CHICKEN",*/ 

    public static final String EVENT_SCHEMA = "{" + "\"type\":\"record\"," 
      + "\"name\":\"eventrecord\"," + "\"fields\":[" 
      + " { \"name\":\"action\", \"type\":\"string\" }," 
      + " { \"name\":\"tenantid\", \"type\":\"int\" }," 
      + " { \"name\":\"lat\", \"type\":\"double\" }," 
      + " { \"name\":\"lon\", \"type\":\"double\" }," 
      + " { \"name\":\"memberid\", \"type\":\"int\" }," 
      + " { \"name\":\"event_name\", \"type\":\"string\" }," 
      + " { \"name\":\"productUpccd\", \"type\":\"int\" }," 
      + " { \"name\":\"device_type\", \"type\":\"string\" }," 
      + " { \"name\":\"device_os_ver\", \"type\":\"string\" }," 
      + "{ \"name\":\"item_name\", \"type\":\"string\" }" + "]}"; 

    public static void main(String[] args) throws InterruptedException { 
     Properties props = new Properties(); 
     props.put("bootstrap.servers", "##########:9092"); 
     props.put("key.serializer", 
       "org.apache.kafka.common.serialization.StringSerializer"); 
     props.put("value.serializer", 
       "org.apache.kafka.common.serialization.StringSerializer"); 
     props.put("producer.type", "async"); 
     Schema.Parser parser = new Schema.Parser(); 
     Schema schema = parser.parse(EVENT_SCHEMA); 
     Injection<GenericRecord, String> avroRecords = GenericAvroCodecs.toJson(schema); 
     KafkaProducer<String, String> producer = new KafkaProducer<>(props); 
     for(int i = 0; i<300;i++){ 
      GenericData.Record avroRecord = new GenericData.Record(schema); 
      setEventValues(i, avroRecord); 
      String messages = avroRecords.apply(avroRecord); 
      ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>("fishbowl",String.valueOf(i),messages); 
      System.out.println(producerRecord); 
      producer.send(producerRecord); 

     } 
     producer.close(); 
    } 

    private static void setEventValues(int i, Record avroRecord) { 

     avroRecord.put("action", "AppEvent"); 
     avroRecord.put("tenantid", i); 
     avroRecord.put("lat", i*0.0); 
     avroRecord.put("lon", 0.0); 
     avroRecord.put("memberid", i*55); 
     avroRecord.put("event_name", "CATEGORY_CLICK"); 
     avroRecord.put("productUpccd", 0); 
     avroRecord.put("device_type", "iPhone"); 
     avroRecord.put("device_os_ver", "10.1"); 
     avroRecord.put("item_name", "CHICKEN"); 
    } 

} 
+0

Что тип данные, вставленные в Кафку? – maasg

+0

Я также предоставил вам класс продюсеров, посмотрите изменения. –

+0

Вы уверены, что Kafka пуста к другим сообщениям при запуске этой программы? Исключение говорит о том, что мы пытаемся использовать 'Byte []' как 'String', но десериализаторы Kafka выглядят хорошо настроенными, а также продюсером. Попробуйте обрезать кафку всех сообщений перед тестированием. – maasg

ответ

3

Кафки потребитель должен быть настроен на использование правильного декодера:

Вместо:

KafkaUtils.createStream[String, String,DefaultDecoder, DefaultDecoder] 

Для String должно быть:

KafkaUtils.createStream[String, String,StringDecoder, StringDecoder] 
+0

Спасибо Маасгу за вашу помощь. –

+0

@jackAKAkarthik Для новой проблемы задайте новый вопрос. Не задавайте последующих вопросов по комментариям. – maasg

+0

Я поставил новый вопрос http://stackoverflow.com/questions/41222594/inappro-output-while-creating-a-dataframe –