У меня есть следующий класс потока твитов. Он имеет событие TweetReceived, которое может использоваться с другими компонентами моей системы.Наблюдаемый vs FSharpx asyncSeq
Кажется, что все в порядке, но у меня такое ощущение, что это сложнее, чем должно быть.
Есть ли какие-либо инструменты, чтобы дать мне эту функциональность без необходимости самостоятельно реализовать механизм mbox/event?
Также вы бы рекомендовали использовать asyncSeq вместо IObservable?
Спасибо!
type TweetStream (cfg:oauth.Config) =
let token = TwitterToken.Token (cfg.accessToken,
cfg.accessTokenSecret,
cfg.appKey,
cfg.appSecret)
let stream = new SimpleStream("https://stream.twitter.com/1.1/statuses/sample.json")
let event = new Event<_>()
let agent = MailboxProcessor.Start(fun (mbox) ->
let rec loop() =
async {
let! msg = mbox.Receive()
do event.Trigger(msg)
return! loop()
}
loop())
member x.TweetReceived = event.Publish
member x.Start() =
Task.Factory.StartNew(fun() -> stream.StartStream(token, agent.Post))
|> ignore
member x.Stop = stream.StopStream
UPDATE: Благодаря Thomas для быстрой (как всегда) ответ на второй вопрос.
Мой первый вопрос может быть немного неясным, поэтому я реорганизовал код, чтобы сделать класс AgentEvent видимым, и я перефразирую первый вопрос: есть ли способ реализовать логику в AgentEvent проще? Эта логика уже реализована в каком-то месте?
Я прошу об этом, потому что он похож на обычный шаблон использования.
type AgentEvent<'t>()=
let event = new Event<'t>()
let agent = MailboxProcessor.Start(fun (mbox) ->
let rec loop() =
async {
let! msg = mbox.Receive()
do event.Trigger(msg)
return! loop()
}
loop())
member x.Event = event.Publish
member x.Post = agent.Post
type TweetStream (cfg:oauth.Config) =
let token = TwitterToken.Token (cfg.accessToken,
cfg.accessTokenSecret,
cfg.appKey,
cfg.appSecret)
let stream = new SimpleStream("https://stream.twitter.com/1.1/statuses/sample.json")
let agentEvent = AgentEvent()
member x.TweetReceived = agentEvent.Event
member x.Start() =
Task.Factory.StartNew(fun() -> stream.StartStream(token, agentEvent.Post))
|> ignore
member x.Stop = stream.StopStream
Благодарим за быстрый ответ. Я перефразировал первую часть вопроса. Можете ли вы ответить на это? Благодаря! – vidi
Еще одна проблема, с которой я столкнулся при использовании 'Observable', заключается в том, что функции подписчика возвращают' unit' вместо 'Async'. Достаточно просто создать адаптер с помощью 'BlockingQueueAgent', но это уменьшает детерминизм. –
eulerfx