2014-12-21 4 views
0

я написал следующий код Scala и моя платформа является Cloudera CDH 5.2.1 на CentOS 6,5не в состоянии выполнить свою программу SparkStreaming

Tutorial.scala

import org.apache.spark 
import org.apache.spark.SparkContext 
import org.apache.spark.SparkContext._ 
import org.apache.spark.streaming._ 
import org.apache.spark.streaming.twitter._ 
import org.apache.spark.streaming.StreamingContext._ 
import TutorialHelper._ 

object Tutorial { 
    def main(args: Array[String]) { 
     val checkpointDir = TutorialHelper.getCheckPointDirectory() 
     val consumerKey = "..." 
     val consumerSecret = "..." 
     val accessToken = "..." 
     val accessTokenSecret = "..." 
     try { 
      TutorialHelper.configureTwitterCredentials(consumerKey, consumerSecret, accessToken, accessTokenSecret) 
      val ssc = new StreamingContext(new SparkContext(), Seconds(1)) 
      val tweets = TwitterUtils.createStream(ssc, None) 
      val tweetText = tweets.map(tweet => tweet.getText()) 
      tweetText.print() 
      ssc.checkpoint(checkpointDir) 
      ssc.start() 
      ssc.awaitTermination() 
     } finally { 
      //ssc.stop() 
     } 
    } 
} 

Мой build.sbt файл выглядит

import AssemblyKeys._ // put this at the top of the file 

name := "Tutorial" 

scalaVersion := "2.10.3" 

libraryDependencies ++= Seq(
    "org.apache.spark" %% "spark-streaming" % "1.0.0" % "provided", 
    "org.apache.spark" %% "spark-streaming-twitter" % "1.0.0" 
) 

resolvers += "Akka Repository" at "http://repo.akka.io/releases/" 

resourceDirectory in Compile := baseDirectory.value/"resources" 

assemblySettings 

mergeStrategy in assembly := { 
    case m if m.toLowerCase.endsWith("manifest.mf")   => MergeStrategy.discard 
    case m if m.toLowerCase.matches("meta-inf.*\\.sf$")  => MergeStrategy.discard 
    case "log4j.properties"         => MergeStrategy.discard 
    case m if m.toLowerCase.startsWith("meta-inf/services/") => MergeStrategy.filterDistinctLines 
    case "reference.conf"         => MergeStrategy.concat 
    case _             => MergeStrategy.first 
} 

Я также создал файл с именем проекта/plugin.sbt, который имеет следующее содержание

addSbtPlugin("net.virtual-void" % "sbt-cross-building" % "0.8.1") 
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.9.1") 

и проект/build.scala

import sbt._ 

object Plugins extends Build { 
    lazy val root = Project("root", file(".")) dependsOn(
    uri("git://github.com/sbt/sbt-assembly.git#0.9.1") 
) 
} 

после этого я могу построить свою "убер" сборку с помощью

sbt assembly 

теперь я запускаю мой код, используя

sudo -u hdfs spark-submit --class Tutorial --master local /tmp/Tutorial-assembly-0.1-SNAPSHOT.jar 

Я получаю ошибку

Configuring Twitter OAuth 
     Property twitter4j.oauth.accessToken set as [...] 
     Property twitter4j.oauth.consumerSecret set as [...] 
     Property twitter4j.oauth.accessTokenSecret set as [...] 
     Property twitter4j.oauth.consumerKey set as [...] 

SLF4J: Class path contains multiple SLF4J bindings. 
SLF4J: Found binding in [jar:file:/opt/cloudera/parcels/CDH-5.2.1-1.cdh5.2.1.p0.12/jars/spark-assembly-1.1.0-cdh5.2.1-hadoop2.5.0-cdh5.2.1.jar!/org/slf4j/impl/StaticLoggerBinder.class] 
SLF4J: Found binding in [jar:file:/opt/cloudera/parcels/CDH-5.2.1-1.cdh5.2.1.p0.12/jars/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class] 
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. 
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory] 
14/12/21 16:04:30 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 
------------------------------------------- 
Time: 1419199472000 ms 
------------------------------------------- 

------------------------------------------- 
Time: 1419199473000 ms 
------------------------------------------- 

14/12/21 16:04:33 ERROR ReceiverSupervisorImpl: Error stopping receiver 0org.apache.spark.Logging$class.log(Logging.scala:52) 
org.apache.spark.streaming.twitter.TwitterReceiver.log(TwitterInputDStream.scala:60) 
org.apache.spark.Logging$class.logInfo(Logging.scala:59) 
org.apache.spark.streaming.twitter.TwitterReceiver.logInfo(TwitterInputDStream.scala:60) 
org.apache.spark.streaming.twitter.TwitterReceiver.onStop(TwitterInputDStream.scala:101) 
org.apache.spark.streaming.receiver.ReceiverSupervisor.stopReceiver(ReceiverSupervisor.scala:136) 
org.apache.spark.streaming.receiver.ReceiverSupervisor.stop(ReceiverSupervisor.scala:112) 
org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:127) 
org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:106) 
org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$9.apply(ReceiverTracker.scala:264) 

ответ

1

Вам необходимо использовать плагин сборки sbt для подготовки «собранного» файла jar со всеми зависимостями. Он должен содержать все классы использования твиттера.

Ссылки:
1. https://github.com/sbt/sbt-assembly
2. http://prabstechblog.blogspot.com/2014/04/creating-single-jar-for-spark-project.html
3. http://eugenezhulenev.com/blog/2014/10/18/run-tests-in-standalone-spark-cluster/

Или вы посмотрите на мой проект искровым Twitter может, он настроил SBT сборки плагина: http://eugenezhulenev.com/blog/2014/11/20/twitter-analytics-with-spark/

+0

Я пробовал все перечисленное в блогах выше. но все равно получена ошибка. Я обновил свой пост, чтобы перечислить свои новые шаги. если возможно, дайте мне знать, где я ошибаюсь ... –

+0

Похоже, что slf4j включен в сборку транзитивной зависимостью от искры, я предлагаю вам использовать плагин зависимости-tree для sbt, чтобы найти источник этой зависимости. Также вы можете посмотреть, как я решил эту проблему, исключая зависимости в sbt: https://github.com/pellucidanalytics/tweet-driven-comparable-companies/blob/master/project/Dependency.scala#L47 –

0

CDH 5.2 пакеты Spark 1.1.0, но вы build.sbt используете 1.0.0. Обновите приведенные ниже версии и перестройте, чтобы исправить вашу проблему.

libraryDependencies ++= Seq(
    "org.apache.spark" %% "spark-streaming" % "1.0.0" % "provided", 
    "org.apache.spark" %% "spark-streaming-twitter" % "1.0.0" 
) 

 Смежные вопросы

  • Нет связанных вопросов^_^