0

Как читать данные ответа из Twitter Streaming API - статусы POST/фильтр? У меня установлено соединение, и я получаю код статуса 200, но я не знаю, как читать твиты. Я просто хочу напечатать твиты, когда они придут.Playframework и Twitter Streaming API

ws.url(url) 
.sign(OAuthCalculator(consumerKey, requestToken)) 
.withMethod("POST") 
.stream() 
.map { response => 
    if(response.headers.status == 200) 
    println(response.body) 
} 

EDIT: Я нашел это решение

ws.url(url) 
.sign(OAuthCalculator(consumerKey, requestToken)) 
.withMethod("POST") 
.stream() 
.map { response => 
    if(response.headers.status == 200){ 
    response.body 
     .scan("")((acc, curr) => if (acc.contains("\r\n")) curr.utf8String else acc + curr.utf8String) 
     .filter(_.contains("\r\n")) 
     .map(json => Try(parse(json).extract[Tweet])) 
     .runForeach { 
     case Success(tweet) => 
      println("-----") 
      println(tweet.text) 
     case Failure(e) => 
      println("-----") 
      println(e.getStackTrace) 
     } 
    } 
} 

ответ

4

Тело ответа для потока WS-запроса - это потоки Akka Source байтов. Поскольку ответы Twitter Api ограничены новой строкой (обычно), вы можете использовать Framing.delimiter, чтобы разбить их на байтовые фрагменты, разобрать куски на JSON и сделать с ними то, что вы хотите. Нечто подобное должно работать:

import akka.stream.scaladsl.Framing 
import scala.util.{Success, Try} 
import akka.util.ByteString 
import play.api.libs.json.{JsSuccess, Json, Reads} 
import play.api.libs.oauth.{ConsumerKey, OAuthCalculator, RequestToken} 

case class Tweet(id: Long, text: String) 
object Tweet { 
    implicit val reads: Reads[Tweet] = Json.reads[Tweet] 
} 

def twitter = Action.async { implicit request => 
    ws.url("https://stream.twitter.com/1.1/statuses/filter.json?track=Rio2016") 
     .sign(OAuthCalculator(consumerKey, requestToken)) 
     .withMethod("POST") 
     .stream().flatMap { response => 
    response.body 
     // Split up the byte stream into delimited chunks. Note 
     // that the chunks are quite big 
     .via(Framing.delimiter(ByteString.fromString("\n"), 20000)) 
     // Parse the chunks into JSON, and then to a Tweet. 
     // A better parsing strategy would be to account for all 
     // the different possible responses, but here we just 
     // collect those that match a Tweet. 
     .map(bytes => Try(Json.parse(bytes.toArray).validate[Tweet])) 
     .collect { 
     case Success(JsSuccess(tweet, _)) => tweet.text 
     } 
     // Print out each chunk 
     .runForeach(println).map { _ => 
     Ok("done") 
    } 
    } 
} 

Примечания: материализовать поток, который вы должны будете вводить неявный Materializer в контроллер.

+0

Спасибо за разъяснение – mkovacek

+0

Возможно ли позднее тесное соединение? Я планирую иметь несколько запросов для отслеживания разных слов, и я хочу, чтобы в какое-то время закрывалось определенное соединение? – mkovacek

+1

Посмотрите на [Управление динамическим потоком] (http://doc.akka.io/docs/akka/2.4.8/scala/stream/stream-dynamic.html) в документах Akka. Одна идея: создать общий коммутатор kill, а затем добавить его в поток с помощью 'source.via (killSwitch.flow)'. Запуск 'shutdown()' на killswitch должен закрыть соединение. – Mikesname

3

призывающую stream() дает вам обратно Future[StreamedResponse]. вам тогда придется использовать иклоны akka, чтобы конвертировать в него ByteString кусков. что-то вроде:

val stream = ws.url(url) 
    .sign(OAuthCalculator(consumerKey, requestToken)) 
    .withMethod("POST") 
    .stream() 

stream flatMap { res => 
    res.body.runWith(Sink.foreach[ByteString] { bytes => 
    println(bytes.utf8String) 
    }) 
} 

отметить, что я не проверял код, указанный выше (но он основан офф секции отклика потокового из https://www.playframework.com/documentation/2.5.x/ScalaWS плюс описание раковины из http://doc.akka.io/docs/akka/2.4.2/scala/stream/stream-flows-and-basics.html)

также отметить, что это будет печатать каждый кусок на своей собственной линии, и я не уверен, возвращает ли API twitter полный json blobs за кусок. вам может потребоваться использовать Sink.fold, если вы хотите скопировать куски перед их печатью.

 Смежные вопросы

  • Нет связанных вопросов^_^