2015-09-03 4 views
4

Я написал небольшую программу Scala, которая использует API потоковой передачи Apache Flink для чтения твитов Twitter.IOExcpetion при подключении к Twitter Streaming API с Apache Flink

object TwitterWordCount { 
    private val properties = "/home/twitter-login.properties" 
    def main(args: Array[String]) { 
    val env = StreamExecutionEnvironment.getExecutionEnvironment 
    val twitterStream = env.addSource(new TwitterSource(properties)) 
    val tweets = twitterStream 
     .flatMap(new JSONParseFlatMap[String, String] { 
     override def flatMap(in: String, out: Collector[String]): Unit = { 
      if (getString(in, "user.lang") == "en") { 
      out.collect(getString(in, "text")) 
      } 
     } 
     }) 
    tweets.print 
    env.execute("tweets") 
    } 
} 

При выполнении я сталкиваюсь со следующей проблемой:

14:35:48,353 INFO com.twitter.hbc.httpclient.ClientBase - twitterSourceClient Establishing a connection 
14:35:48,354 DEBUG org.apache.http.impl.conn.PoolingClientConnectionManager - Connection request: [route: {}->http://stream.twitter.com][total kept alive: 0; route allocated: 0 of 2; total allocated: 0 of 20] 
14:35:48,354 DEBUG org.apache.http.impl.conn.PoolingClientConnectionManager - Connection leased: [id: 4][route: {}->http://stream.twitter.com][total kept alive: 0; route allocated: 1 of 2; total allocated: 1 of 20] 
14:35:48,354 DEBUG org.apache.http.impl.conn.DefaultClientConnectionOperator - Connecting to stream.twitter.com:80 
14:35:49,486 DEBUG org.apache.flink.runtime.taskmanager.TaskManager - Received message SendHeartbeat at akka://flink/user/taskmanager_1 from Actor[akka://flink/deadLetters]. 
14:35:49,486 DEBUG org.apache.flink.runtime.taskmanager.TaskManager - Sending heartbeat to JobManager 
14:35:49,487 DEBUG org.apache.flink.runtime.taskmanager.TaskManager - Handled message SendHeartbeat in 1 ms from Actor[akka://flink/deadLetters]. 
14:35:49,487 DEBUG org.apache.flink.runtime.jobmanager.JobManager - Received message Heartbeat(cb51cdb1bd08879df10bd2198b8e043a,[[email protected]) at akka://flink/user/jobmanager from Actor[akka://flink/user/taskmanager_1#-64418449]. 
14:35:49,488 DEBUG org.apache.flink.runtime.jobmanager.JobManager - Received hearbeat message from cb51cdb1bd08879df10bd2198b8e043a. 
14:35:49,488 DEBUG org.apache.flink.runtime.instance.InstanceManager - Received heartbeat from TaskManager cb51cdb1bd08879df10bd2198b8e043a @ localhost - 8 slots - URL: akka://flink/user/taskmanager_1 
14:35:49,488 DEBUG org.apache.flink.runtime.jobmanager.JobManager - Handled message Heartbeat(cb51cdb1bd08879df10bd2198b8e043a,[[email protected]) in 0 ms from Actor[akka://flink/user/taskmanager_1#-64418449]. 
14:35:52,358 DEBUG org.apache.http.impl.conn.DefaultClientConnection - Connection [email protected] closed 
14:35:52,358 DEBUG org.apache.http.impl.conn.DefaultClientConnection - Connection [email protected] shut down 
14:35:52,358 DEBUG org.apache.http.impl.conn.PoolingClientConnectionManager - Connection [id: 4][route: {}->http://stream.twitter.com] can be kept alive for 9223372036854775807 MILLISECONDS 
14:35:52,358 DEBUG org.apache.http.impl.conn.DefaultClientConnection - Connection [email protected] closed 
14:35:52,358 DEBUG org.apache.http.impl.conn.PoolingClientConnectionManager - Connection released: [id: 4][route: {}->http://stream.twitter.com][total kept alive: 0; route allocated: 0 of 2; total allocated: 0 of 20] 
14:35:52,359 WARN com.twitter.hbc.httpclient.ClientBase - twitterSourceClient IOException caught when establishing connection to https://stream.twitter.com/1.1/statuses/filter.json?delimited=length 
14:35:53,613 WARN com.twitter.hbc.httpclient.ClientBase - twitterSourceClient failed to establish connection properly 
14:35:53,613 INFO com.twitter.hbc.httpclient.ClientBase - twitterSourceClient Done processing, preparing to close connection 
14:35:53,613 DEBUG org.apache.http.impl.conn.PoolingClientConnectionManager - Connection manager is shutting down 
14:35:53,613 DEBUG org.apache.http.impl.conn.PoolingClientConnectionManager - Connection manager shut down 

Программа пытается повторно установить соединение. Таким образом, эти 4 строки сообщения журнала продолжают излучаться.

Странная вещь в этом, когда я запускаю example, предоставленный в проекте Apache Flink, все работает отлично (я вытащил последнюю версию мастера из GitHub). Я даже использую тот же файл свойств. Если я копирую этот примерный класс в свой собственный проект, возникает также состояние проблемы выше.

Я использовал архетип Flink, чтобы создать свой собственный проект. Я пробовал в версии 0.9.1, а также 0.10-SNAPSHOT. В соответствующей версии используются зависимости flink-scala, flink-streaming-scala, flink-clients и flink-connector-twitter.

Кто-нибудь испытал подобную проблему и может получить меня на правильном пути?

+0

Является ли IOException зарегистрированным где-то? Может быть, на уровне журнала DEBUG? Если нет, возможно, вы можете использовать отладчик, чтобы увидеть исключение, вызванное клиентом Twitter. –

+0

Эй, я использовал ваш код, удалив часть «getstring», я не могу получить твиты, я упоминал проблему здесь и https://stackoverflow.com/questions/47083220/not-able-to-retrieve- the-tweets-using-flink – JSR29

ответ

3

com.twitter.hbc.httpclient.ClientBase отладка привел меня к следующему Exception: org.apache.http.conn.ConnectTimeoutException: Connect to stream.twitter.com:80 timed out

Согласно post на форуме разработчиков Twitter это происходит из-за ошибки в апачей HttpClient 4,2. Фактически, разрешение дерева зависимостей на моем проекте показывает, что время выполнения flink имеет зависимость от com.amazonaws: aws-java-sdk: 1.81, которая снова имеет зависимость от org.apache.httpcomponents: httpclient: 4.2.

Добавление HttpClient 4.2.6 к зависимостям моего проекта временно разрешило проблему.

0

Благодаря @ peedeeX21 ваше решение помогло мне! Добавление явной зависимости от pom.xml решит проблему во время работы от eclipse, но когда вы используете flink-кластер и подаете программу с запуском flink - версия, упакованная с flink distro, все равно выигрывает.

Я решил это, загрузив jQuery httpclient-4.2.6.jar в flink/lib и переименовал его в «a» (ahttpclient-4.2.6.jar), поэтому он будет добавлен в classpath из flink runtime сначала (сделано bin/config.sh) Надеюсь, что это поможет кому-то.

+0

Пожалуйста, не добавляйте «спасибо» в качестве ответа. Вместо этого проголосуйте за ответы, которые вы найдете полезными. - [Из обзора] (/ review/low-quality-posts/11433576) –

+2

«спасибо» была моей первой частью, и да, я проголосовал. Я также добавил, как проблема может быть решена во время выполнения flink. –

 Смежные вопросы

  • Нет связанных вопросов^_^