2016-11-09 5 views
1

Я следил за этим great tutorial, чтобы использовать прямой поток Twitter в Python, используя tweepy. Это будет печатать твиты в режиме реального времени, в которых упоминаются RxJava, RxPy, RxScala или ReactiveX.RxPy - Включите поток Live Twitter в Rx Observable?

from tweepy.streaming import StreamListener 
from tweepy import OAuthHandler 
from tweepy import Stream 
from rx import Observable, Observer 

#Variables that contains the user credentials to access Twitter API 
access_token = "CONFIDENTIAL" 
access_token_secret = "CONFIDENTIAL" 
consumer_key = "CONFIDENTIAL" 
consumer_secret = "CONFIDENTIAL" 


#This is a basic listener that just prints received tweets to stdout. 
class TweetObserver(StreamListener): 

    def on_data(self, data): 
     print(data) 
     return True 

    def on_error(self, status): 
     print(status) 



if __name__ == '__main__': 

    #This handles Twitter authetification and the connection to Twitter Streaming API 
    l = TweetObserver() 
    auth = OAuthHandler(consumer_key, consumer_secret) 
    auth.set_access_token(access_token, access_token_secret) 
    stream = Stream(auth, l) 

    #This line filter Twitter Streams to capture data by the keywords: 'python', 'javascript', 'ruby' 
    stream.filter(track=['rxjava','rxpy','reactivex','rxscala']) 

Это идеальный кандидат, чтобы превратиться в ReactiveX через RxPy наблюдаемого. Но как именно я превращаю это в горячий источник Observable? Кажется, я не могу найти документацию о том, как выполнить Observable.create() ...

+0

Я помню, что могу выполнить это с помощью Субъекта, и я был успешным там. Но все еще интересно, могу ли я сделать это без субъекта ... – tmn

ответ

0

Я понял это некоторое время назад. Вы должны определить функцию, которая обрабатывает переданный аргумент Observer. Затем вы передаете это Observable.create().

from tweepy.streaming import StreamListener 
from tweepy import OAuthHandler 
from tweepy import Stream 
import json 
from rx import Observable 

# Variables that contains the user credentials to access Twitter API 
access_token = "PUT YOURS HERE" 
access_token_secret = "PUT YOURS HERE" 
consumer_key = "PUT YOURS HERE" 
consumer_secret = "PUT YOURS HERE" 


def tweets_for(topics): 
    def observe_tweets(observer): 
     class TweetListener(StreamListener): 
      def on_data(self, data): 
       observer.on_next(data) 
       return True 

      def on_error(self, status): 
       observer.on_error(status) 

     # This handles Twitter authetification and the connection to Twitter Streaming API 
     l = TweetListener() 
     auth = OAuthHandler(consumer_key, consumer_secret) 
     auth.set_access_token(access_token, access_token_secret) 
     stream = Stream(auth, l) 
     stream.filter(track=topics) 

    return Observable.create(observe_tweets).share() 


topics = ['Britain', 'France'] 

tweets_for(topics) \ 
    .map(lambda d: json.loads(d)) \ 
    .subscribe(on_next=lambda s: print(s), on_error=lambda e: print(e))