Я пытаюсь представить искрообразование + кафка, которое просто читает строки строки из темы кафки. Тем не менее, я получаю следующее исключениеDSE Spark Streaming + Kafka NoSuchMethodError
15/07/24 22:39:45 ОШИБКА TaskSetManager: Задача 0 на этапе 2.0 не удалась 4 раза; прерывание работы Исключение из потока «Thread-49» org.apache.spark.SparkException: Работа прерывается из-за срыва этапа: Задача 0 на этапе 2.0 не удалась 4 раза, последний сбой: потерянная задача 0.3 на этапе 2.0 (TID 73, 10.11 .112.93): java.lang.NoSuchMethodException: kafka.serializer.StringDecoder. (Kafka.utils.VerifiableProperties) java.lang.Class.getConstructor0 (Class.java:2892) java.lang.Class.getConstructor (класс.java : 1723) org.apache.spark.streaming.kafka.KafkaReceiver.onStart (KafkaInputDStream.scala: 106) org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver (ReceiverSupervisor.scala: 121) org.apache. spark.streaming.receiver.ReceiverSupervisor.start (ReceiverSupervisor.scala: 106) org.apache.spark.streaming.scheduler.ReceiverTracker $ ReceiverLauncher $$ ano nfun $ 9.apply (ReceiverTracker.scala: 264) org.apache.spark.streaming.scheduler.ReceiverTracker $ ReceiverLauncher $$ anonfun $ 9.apply (ReceiverTracker.scala: 257) org.apache.spark.SparkContext $$ anonfun $ runJob $ 4.apply (SparkContext.scala: 1121) org.apache.spark.SparkContext $$ anonfun $ runJob $ 4.apply (SparkContext.scala: 1121) org.apache.spark.scheduler.ResultTask.runTask (ResultTask.scala : 62) org.apache.spark.scheduler.Task.run (Task.scala: 54) org.apache.spark.executor.Executor $ TaskRunner.run (Executor.scala: 177) java.util.concurrent. ThreadPoolExecutor.runWorker (ThreadPoolExecutor.java:1145) java.util.concurrent.ThreadPoolExecutor $ Worker.run (ThreadPoolExecutor.java:615) java.lang.Thread.run (Thread.java:7 45)
Когда я проверил файлы искр, используемые DSE, я вижу, что он использует kafka_2.10-0.8.0.jar, у которого есть этот конструктор. Не уверен, что вызывает ошибку. Вот мой код потребитель
val sc = new SparkContext(sparkConf)
val streamingContext = new StreamingContext(sc, SLIDE_INTERVAL)
val topicMap = kafkaTopics.split(",").map((_, numThreads.toInt)).toMap
val accessLogsStream = KafkaUtils.createStream(streamingContext, zooKeeper, "AccessLogsKafkaAnalyzer", topicMap)
val accessLogs = accessLogsStream.map(_._2).map(log => ApacheAccessLog.parseLogLine(log).cache()
UPDATE Это исключение, кажется, происходит только тогда, когда я представляю работу. Если я использую искровую оболочку для запуска задания, вставив код, он отлично работает