2016-10-27 3 views
2

Я пытаюсь запустить пример потоковой передачи Twitter в Zeppelin. После того, как я обыскал, я добавил «org.apache.bahir: spark-streaming-twitter_2.11: 2.0.0» в Spark Interpreter. Так что я могу сделать первую часть работы, как в:Zeppelin Twitter Streaming Пример не работает

Apache Zeppelin 0.6.1: Run Spark 2.0 Twitter Stream App

Теперь я пытаюсь добавить вторую половину, как:

case class Tweet(createdAt:Long, text:String, screenName:String) 
twt.map(status=> 
    Tweet(status.getCreatedAt().getTime()/1000, status.getText(), status.getUser().getScreenName()) 
).foreachRDD(rdd=> 
    rdd.toDF().registerTempTable("tweets") 
) 

Теперь я получил ошибку:

<console>:56: error: not found: type StreamingContext 
     val ssc = new StreamingContext(sc, Seconds(2)) 
        ^
<console>:56: error: not found: value Seconds 
     val ssc = new StreamingContext(sc, Seconds(2)) 
             ^
<console>:61: error: not found: value Seconds 
     val twt = tweets.window(Seconds(60)) 

На самом деле я добавил строку case, я получил вышеуказанную ошибку. Я действительно не знал, что здесь произошло.

У кого-нибудь есть ключ?

Вот детали Свечи: 2.0.0 Цеппелин: 0.6.2

большое спасибо.

==================================================================================================================================== =======================

// All codes for your reference: 
import org.apache.spark.streaming.twitter 
import org.apache.spark.streaming._ 
import org.apache.spark.storage.StorageLevel 
import scala.io.Source 
import scala.collection.mutable.HashMap 
import java.io.File 
import org.apache.log4j.Logger 
import org.apache.log4j.Level 
import sys.process.stringSeqToProcess 
import org.apache.spark.SparkConf 

// ********************************* Configures the Oauth Credentials for accessing Twitter **************************** 
def configureTwitterCredentials(apiKey: String, apiSecret: String, accessToken: String, accessTokenSecret: String) {...} 

// ***************************************** Configure Twitter credentials ******************************************** 
val apiKey = ... 
val apiSecret = ... 
val accessToken = ... 
val accessTokenSecret = ... 
configureTwitterCredentials(apiKey, apiSecret, accessToken, accessTokenSecret) 

// ************************************************* The logic itself ************************************************* 
val ssc = new StreamingContext(sc, Seconds(2)) 
val tweets = TwitterUtils.createStream(ssc, None) 
val twt = tweets.window(Seconds(60)) 
twt.print 
// above codes work correctly 

// If added the following line, it failed with the above error 
case class Tweet(createdAt:Long, text:String, screenName:String) 

ответ

3

у меня была такая же проблема, и я понятия не имею, почему перемещение операторов импорта сверху прямо перед тем, как новый StreamingContext зафиксировал его, но он это сделал.

import org.apache.spark.streaming._ //moved here from top 
import org.apache.spark.streaming.twitter._ //moved here from top 
val ssc = new StreamingContext(sc, Seconds(2)) //existing 
0

У меня была аналогичная проблема. Использование FQCNs работало нормально, поэтому я в конечном итоге использовал это как обходной путь.

 Смежные вопросы

  • Нет связанных вопросов^_^