я написал следующий код Scala и моя платформа является Cloudera CDH 5.2.1 на CentOS 6,5не в состоянии выполнить свою программу SparkStreaming
Tutorial.scala
import org.apache.spark
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.streaming._
import org.apache.spark.streaming.twitter._
import org.apache.spark.streaming.StreamingContext._
import TutorialHelper._
object Tutorial {
def main(args: Array[String]) {
val checkpointDir = TutorialHelper.getCheckPointDirectory()
val consumerKey = "..."
val consumerSecret = "..."
val accessToken = "..."
val accessTokenSecret = "..."
try {
TutorialHelper.configureTwitterCredentials(consumerKey, consumerSecret, accessToken, accessTokenSecret)
val ssc = new StreamingContext(new SparkContext(), Seconds(1))
val tweets = TwitterUtils.createStream(ssc, None)
val tweetText = tweets.map(tweet => tweet.getText())
tweetText.print()
ssc.checkpoint(checkpointDir)
ssc.start()
ssc.awaitTermination()
} finally {
//ssc.stop()
}
}
}
Мой build.sbt файл выглядит
import AssemblyKeys._ // put this at the top of the file
name := "Tutorial"
scalaVersion := "2.10.3"
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-streaming" % "1.0.0" % "provided",
"org.apache.spark" %% "spark-streaming-twitter" % "1.0.0"
)
resolvers += "Akka Repository" at "http://repo.akka.io/releases/"
resourceDirectory in Compile := baseDirectory.value/"resources"
assemblySettings
mergeStrategy in assembly := {
case m if m.toLowerCase.endsWith("manifest.mf") => MergeStrategy.discard
case m if m.toLowerCase.matches("meta-inf.*\\.sf$") => MergeStrategy.discard
case "log4j.properties" => MergeStrategy.discard
case m if m.toLowerCase.startsWith("meta-inf/services/") => MergeStrategy.filterDistinctLines
case "reference.conf" => MergeStrategy.concat
case _ => MergeStrategy.first
}
Я также создал файл с именем проекта/plugin.sbt, который имеет следующее содержание
addSbtPlugin("net.virtual-void" % "sbt-cross-building" % "0.8.1")
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.9.1")
и проект/build.scala
import sbt._
object Plugins extends Build {
lazy val root = Project("root", file(".")) dependsOn(
uri("git://github.com/sbt/sbt-assembly.git#0.9.1")
)
}
после этого я могу построить свою "убер" сборку с помощью
sbt assembly
теперь я запускаю мой код, используя
sudo -u hdfs spark-submit --class Tutorial --master local /tmp/Tutorial-assembly-0.1-SNAPSHOT.jar
Я получаю ошибку
Configuring Twitter OAuth
Property twitter4j.oauth.accessToken set as [...]
Property twitter4j.oauth.consumerSecret set as [...]
Property twitter4j.oauth.accessTokenSecret set as [...]
Property twitter4j.oauth.consumerKey set as [...]
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/opt/cloudera/parcels/CDH-5.2.1-1.cdh5.2.1.p0.12/jars/spark-assembly-1.1.0-cdh5.2.1-hadoop2.5.0-cdh5.2.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/opt/cloudera/parcels/CDH-5.2.1-1.cdh5.2.1.p0.12/jars/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
14/12/21 16:04:30 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
-------------------------------------------
Time: 1419199472000 ms
-------------------------------------------
-------------------------------------------
Time: 1419199473000 ms
-------------------------------------------
14/12/21 16:04:33 ERROR ReceiverSupervisorImpl: Error stopping receiver 0org.apache.spark.Logging$class.log(Logging.scala:52)
org.apache.spark.streaming.twitter.TwitterReceiver.log(TwitterInputDStream.scala:60)
org.apache.spark.Logging$class.logInfo(Logging.scala:59)
org.apache.spark.streaming.twitter.TwitterReceiver.logInfo(TwitterInputDStream.scala:60)
org.apache.spark.streaming.twitter.TwitterReceiver.onStop(TwitterInputDStream.scala:101)
org.apache.spark.streaming.receiver.ReceiverSupervisor.stopReceiver(ReceiverSupervisor.scala:136)
org.apache.spark.streaming.receiver.ReceiverSupervisor.stop(ReceiverSupervisor.scala:112)
org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:127)
org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:106)
org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$9.apply(ReceiverTracker.scala:264)
Я пробовал все перечисленное в блогах выше. но все равно получена ошибка. Я обновил свой пост, чтобы перечислить свои новые шаги. если возможно, дайте мне знать, где я ошибаюсь ... –
Похоже, что slf4j включен в сборку транзитивной зависимостью от искры, я предлагаю вам использовать плагин зависимости-tree для sbt, чтобы найти источник этой зависимости. Также вы можете посмотреть, как я решил эту проблему, исключая зависимости в sbt: https://github.com/pellucidanalytics/tweet-driven-comparable-companies/blob/master/project/Dependency.scala#L47 –