У меня проблема с использованием функции updateStateByKey(). Я следующий, простой код (написанный на базе книги: "Обучение Спарк - Освещение быстрого анализа данных"):updateStateByKey, noClassDefFoundError
object hello {
def updateStateFunction(newValues: Seq[Int], runningCount: Option[Int]): Option[Int] = {
Some(runningCount.getOrElse(0) + newValues.size)
}
def main(args: Array[String]) {
val conf = new SparkConf().setMaster("local[5]").setAppName("AndrzejApp")
val ssc = new StreamingContext(conf, Seconds(4))
ssc.checkpoint("/")
val lines7 = ssc.socketTextStream("localhost", 9997)
val keyValueLine7 = lines7.map(line => (line.split(" ")(0), line.split(" ")(1).toInt))
val statefullStream = keyValueLine7.updateStateByKey(updateStateFunction _)
ssc.start()
ssc.awaitTermination()
}
}
Мой build.sbt является:
name := "stream-correlator-spark"
version := "1.0"
scalaVersion := "2.11.4"
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % "1.3.1" % "provided",
"org.apache.spark" %% "spark-streaming" % "1.3.1" % "provided"
)
Когда я строю его sbt assembly
команда все идет хорошо. Когда я бегу это на свече кластера в локальном режиме я получил сообщение об ошибке:
Исключение в потоке «основной» java.lang.NoClassDefFoundError: орг/апач/искра/потоковое/dstream/DStream $ в привет $ .main (вертолёта .scala: 25) ...
линия 25 является:
val statefullStream = keyValueLine7.updateStateByKey(updateStateFunction _)
Я чувствую, что это может быть какой-то вариант совместимости проблема, но я не знаю, что может быть причиной и как решить эту проблему.
Я был бы очень благодарен за помощь!
Я проверил файл jar, созданный с помощью 'sbt assembly', и он не содержит банок (или классов), связанных с искровым или искровым потоком. Разве 'sbt assembly' не должен содержать все источники, необходимые для работы на любой JVM? – awenclaw