2014-01-11 4 views
1

У меня есть следующий класс потока твитов. Он имеет событие TweetReceived, которое может использоваться с другими компонентами моей системы.Наблюдаемый vs FSharpx asyncSeq

Кажется, что все в порядке, но у меня такое ощущение, что это сложнее, чем должно быть.

Есть ли какие-либо инструменты, чтобы дать мне эту функциональность без необходимости самостоятельно реализовать механизм mbox/event?

Также вы бы рекомендовали использовать asyncSeq вместо IObservable?

Спасибо!

type TweetStream (cfg:oauth.Config) = 
    let token = TwitterToken.Token (cfg.accessToken, 
            cfg.accessTokenSecret, 
            cfg.appKey, 
            cfg.appSecret) 

    let stream = new SimpleStream("https://stream.twitter.com/1.1/statuses/sample.json") 

    let event = new Event<_>() 

    let agent = MailboxProcessor.Start(fun (mbox) -> 
     let rec loop() = 
      async { 
       let! msg = mbox.Receive() 
       do event.Trigger(msg) 
       return! loop() 
      } 
     loop()) 

    member x.TweetReceived = event.Publish 

    member x.Start() = 
     Task.Factory.StartNew(fun() -> stream.StartStream(token, agent.Post)) 
     |> ignore 

    member x.Stop = stream.StopStream 

UPDATE: Благодаря Thomas для быстрой (как всегда) ответ на второй вопрос.

Мой первый вопрос может быть немного неясным, поэтому я реорганизовал код, чтобы сделать класс AgentEvent видимым, и я перефразирую первый вопрос: есть ли способ реализовать логику в AgentEvent проще? Эта логика уже реализована в каком-то месте?

Я прошу об этом, потому что он похож на обычный шаблон использования.

type AgentEvent<'t>()= 
    let event = new Event<'t>() 

    let agent = MailboxProcessor.Start(fun (mbox) -> 
     let rec loop() = 
      async { 
       let! msg = mbox.Receive() 
       do event.Trigger(msg) 
       return! loop() 
      } 
     loop()) 
    member x.Event = event.Publish 
    member x.Post = agent.Post 

type TweetStream (cfg:oauth.Config) = 
    let token = TwitterToken.Token (cfg.accessToken, 
            cfg.accessTokenSecret, 
            cfg.appKey, 
            cfg.appSecret) 

    let stream = new SimpleStream("https://stream.twitter.com/1.1/statuses/sample.json") 

    let agentEvent = AgentEvent() 

    member x.TweetReceived = agentEvent.Event 

    member x.Start() = 
     Task.Factory.StartNew(fun() -> stream.StartStream(token, agentEvent.Post)) 
     |> ignore 

    member x.Stop = stream.StopStream 

ответ

3

Я думаю, что IObservable является правильной абстракцией для публикации событий. Что касается их обработки, я бы использовал либо Reactive Extensions, либо F # Agents (MailboxProcessor), в зависимости от того, что вы хотите сделать.

Заметим, что F # автоматически представляет события, как IObservable значений (на самом деле IEvent, но наследует от наблюдаемой), так что вы можете использовать Reactive Extensions непосредственно на TweetReceived.

Какое правильное представление?

  • Основной момент asyncSeq является то, что она позволяет контролировать, как быстро генерируются данные - это как async в том, что вы должны запустить его на самом деле сделать работу и получить значение - так это полезно если вы можете начать какую-либо операцию (например, загрузить следующие несколько байтов), чтобы получить следующее значение

  • IObservable полезен, когда вы не контролируете источник данных - когда он просто продолжает производить значения, и у вас нет возможности приостановить его - это кажется более подходящим для твитов.

Что касается обработки, я думаю, что реактивные расширения хороши, когда они уже реализуют необходимые операции. Когда вам нужно написать какую-то пользовательскую логику (это не так легко выразить в Rx), использование агента - отличный способ написать свои собственные Rx-подобные функции.

+0

Благодарим за быстрый ответ. Я перефразировал первую часть вопроса. Можете ли вы ответить на это? Благодаря! – vidi

+0

Еще одна проблема, с которой я столкнулся при использовании 'Observable', заключается в том, что функции подписчика возвращают' unit' вместо 'Async '. Достаточно просто создать адаптер с помощью 'BlockingQueueAgent', но это уменьшает детерминизм. – eulerfx

 Смежные вопросы

  • Нет связанных вопросов^_^