Я написал небольшую программу Scala, которая использует API потоковой передачи Apache Flink для чтения твитов Twitter.IOExcpetion при подключении к Twitter Streaming API с Apache Flink
object TwitterWordCount {
private val properties = "/home/twitter-login.properties"
def main(args: Array[String]) {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val twitterStream = env.addSource(new TwitterSource(properties))
val tweets = twitterStream
.flatMap(new JSONParseFlatMap[String, String] {
override def flatMap(in: String, out: Collector[String]): Unit = {
if (getString(in, "user.lang") == "en") {
out.collect(getString(in, "text"))
}
}
})
tweets.print
env.execute("tweets")
}
}
При выполнении я сталкиваюсь со следующей проблемой:
14:35:48,353 INFO com.twitter.hbc.httpclient.ClientBase - twitterSourceClient Establishing a connection
14:35:48,354 DEBUG org.apache.http.impl.conn.PoolingClientConnectionManager - Connection request: [route: {}->http://stream.twitter.com][total kept alive: 0; route allocated: 0 of 2; total allocated: 0 of 20]
14:35:48,354 DEBUG org.apache.http.impl.conn.PoolingClientConnectionManager - Connection leased: [id: 4][route: {}->http://stream.twitter.com][total kept alive: 0; route allocated: 1 of 2; total allocated: 1 of 20]
14:35:48,354 DEBUG org.apache.http.impl.conn.DefaultClientConnectionOperator - Connecting to stream.twitter.com:80
14:35:49,486 DEBUG org.apache.flink.runtime.taskmanager.TaskManager - Received message SendHeartbeat at akka://flink/user/taskmanager_1 from Actor[akka://flink/deadLetters].
14:35:49,486 DEBUG org.apache.flink.runtime.taskmanager.TaskManager - Sending heartbeat to JobManager
14:35:49,487 DEBUG org.apache.flink.runtime.taskmanager.TaskManager - Handled message SendHeartbeat in 1 ms from Actor[akka://flink/deadLetters].
14:35:49,487 DEBUG org.apache.flink.runtime.jobmanager.JobManager - Received message Heartbeat(cb51cdb1bd08879df10bd2198b8e043a,[[email protected]) at akka://flink/user/jobmanager from Actor[akka://flink/user/taskmanager_1#-64418449].
14:35:49,488 DEBUG org.apache.flink.runtime.jobmanager.JobManager - Received hearbeat message from cb51cdb1bd08879df10bd2198b8e043a.
14:35:49,488 DEBUG org.apache.flink.runtime.instance.InstanceManager - Received heartbeat from TaskManager cb51cdb1bd08879df10bd2198b8e043a @ localhost - 8 slots - URL: akka://flink/user/taskmanager_1
14:35:49,488 DEBUG org.apache.flink.runtime.jobmanager.JobManager - Handled message Heartbeat(cb51cdb1bd08879df10bd2198b8e043a,[[email protected]) in 0 ms from Actor[akka://flink/user/taskmanager_1#-64418449].
14:35:52,358 DEBUG org.apache.http.impl.conn.DefaultClientConnection - Connection [email protected] closed
14:35:52,358 DEBUG org.apache.http.impl.conn.DefaultClientConnection - Connection [email protected] shut down
14:35:52,358 DEBUG org.apache.http.impl.conn.PoolingClientConnectionManager - Connection [id: 4][route: {}->http://stream.twitter.com] can be kept alive for 9223372036854775807 MILLISECONDS
14:35:52,358 DEBUG org.apache.http.impl.conn.DefaultClientConnection - Connection [email protected] closed
14:35:52,358 DEBUG org.apache.http.impl.conn.PoolingClientConnectionManager - Connection released: [id: 4][route: {}->http://stream.twitter.com][total kept alive: 0; route allocated: 0 of 2; total allocated: 0 of 20]
14:35:52,359 WARN com.twitter.hbc.httpclient.ClientBase - twitterSourceClient IOException caught when establishing connection to https://stream.twitter.com/1.1/statuses/filter.json?delimited=length
14:35:53,613 WARN com.twitter.hbc.httpclient.ClientBase - twitterSourceClient failed to establish connection properly
14:35:53,613 INFO com.twitter.hbc.httpclient.ClientBase - twitterSourceClient Done processing, preparing to close connection
14:35:53,613 DEBUG org.apache.http.impl.conn.PoolingClientConnectionManager - Connection manager is shutting down
14:35:53,613 DEBUG org.apache.http.impl.conn.PoolingClientConnectionManager - Connection manager shut down
Программа пытается повторно установить соединение. Таким образом, эти 4 строки сообщения журнала продолжают излучаться.
Странная вещь в этом, когда я запускаю example, предоставленный в проекте Apache Flink, все работает отлично (я вытащил последнюю версию мастера из GitHub). Я даже использую тот же файл свойств. Если я копирую этот примерный класс в свой собственный проект, возникает также состояние проблемы выше.
Я использовал архетип Flink, чтобы создать свой собственный проект. Я пробовал в версии 0.9.1, а также 0.10-SNAPSHOT. В соответствующей версии используются зависимости flink-scala
, flink-streaming-scala
, flink-clients
и flink-connector-twitter
.
Кто-нибудь испытал подобную проблему и может получить меня на правильном пути?
Является ли IOException зарегистрированным где-то? Может быть, на уровне журнала DEBUG? Если нет, возможно, вы можете использовать отладчик, чтобы увидеть исключение, вызванное клиентом Twitter. –
Эй, я использовал ваш код, удалив часть «getstring», я не могу получить твиты, я упоминал проблему здесь и https://stackoverflow.com/questions/47083220/not-able-to-retrieve- the-tweets-using-flink – JSR29