2015-05-25 4 views
0

У меня проблема с использованием функции updateStateByKey(). Я следующий, простой код (написанный на базе книги: "Обучение Спарк - Освещение быстрого анализа данных"):updateStateByKey, noClassDefFoundError

object hello { 
    def updateStateFunction(newValues: Seq[Int], runningCount: Option[Int]): Option[Int] = { 
    Some(runningCount.getOrElse(0) + newValues.size) 
    } 


    def main(args: Array[String]) { 
    val conf = new SparkConf().setMaster("local[5]").setAppName("AndrzejApp") 
    val ssc = new StreamingContext(conf, Seconds(4)) 
    ssc.checkpoint("/") 

    val lines7 = ssc.socketTextStream("localhost", 9997) 
    val keyValueLine7 = lines7.map(line => (line.split(" ")(0), line.split(" ")(1).toInt)) 


    val statefullStream = keyValueLine7.updateStateByKey(updateStateFunction _) 
    ssc.start() 
    ssc.awaitTermination() 
    } 

} 

Мой build.sbt является:

name := "stream-correlator-spark" 

version := "1.0" 

scalaVersion := "2.11.4" 

libraryDependencies ++= Seq(
    "org.apache.spark" %% "spark-core" % "1.3.1" % "provided", 
    "org.apache.spark" %% "spark-streaming" % "1.3.1" % "provided" 
) 

Когда я строю его sbt assembly команда все идет хорошо. Когда я бегу это на свече кластера в локальном режиме я получил сообщение об ошибке:

Исключение в потоке «основной» java.lang.NoClassDefFoundError: орг/апач/искра/потоковое/dstream/DStream $ в привет $ .main (вертолёта .scala: 25) ...

линия 25 является:

val statefullStream = keyValueLine7.updateStateByKey(updateStateFunction _) 

Я чувствую, что это может быть какой-то вариант совместимости проблема, но я не знаю, что может быть причиной и как решить эту проблему.

Я был бы очень благодарен за помощь!

+0

Я проверил файл jar, созданный с помощью 'sbt assembly', и он не содержит банок (или классов), связанных с искровым или искровым потоком. Разве 'sbt assembly' не должен содержать все источники, необходимые для работы на любой JVM? – awenclaw

ответ

1

Когда вы пишете "provided" в SBT, это означает, что ваша библиотека предоставляется средой и не нуждается в ее включении в пакет. Попробуйте удалить "provided" след от "spark-streaming" библиотеки.

0

Вы можете добавить «предоставленный» обратно, когда вам нужно отправить приложение в искровой кластер для запуска. Преимущество «предоставления» заключается в том, что в баночке с жиром результата не будут включаться классы из предоставленных зависимостей, что даст гораздо меньшую толстую банку, по сравнению с отсутствием «предоставленного». В моем случае итоговая баночка будет составлять около 90 М без «предоставленного», а затем сжимается до 30 + М с «предоставленным».