2014-02-11 2 views
1

У меня вопрос о структурах данных, которые содержат операции async. Это может показаться странным.Структура данных (seq, list, array) асинхронных операций

TestActor содержит MailBoxProcessor и имеет три функции: Receive готовит процессор почтового ящика для приема сообщений Post и PostAndAsyncReply используются для отправки сообщений на актер.

type TestActor (init, timeout) = 
    let mutable counter = init 
    let rcvFun = fun (msg) -> async { 
      match msg with 
      | Add i -> 
       counter <- counter + i 
      | GetCounter reply -> 
       reply.Reply counter} 
    do printfn "Initializing actors: " 
    do mailbox.Receive (rcvFun, timeout) ////// RECEIVE IS CALLED AT CONSTRUCTION 

    let mailbox = OnlyLatestMBP<TestMessage>() 

    member x.Receive (timeout) =  
     mailbox.Receive (rcvFun, timeout) 

    member x.Post (msg: TestMessage, timeout) = 
     mailbox.Post(msg, timeout) 

    member x.PostAndAsyncReply (replyChannel, timeout) = 
     mailbox.PostAndAsyncReply(replyChannel, timeout) 

Я хотел бы использовать этот пример, чтобы понять проблему, которая повлияла на мой код. В обычном примере для stacking agents in a data structure, Receive выполнен при строительстве. В моем примере агент может быть протестирован с помощью кода ниже:

let actorsWorkforce = 
    seq { 1 .. 5} 
    |> Seq.map (fun idx -> TestActor(idx, 60000)) 

let test = 
    actorsWorkforce 
    |> Seq.map (fun idx -> idx.PostAndAsyncReply ((fun reply -> GetCounter reply), 10000)) 
    |> Async.Parallel 
    |> Async.RunSynchronously 


let result = 
    test 
    |> Array.iteri (fun idx element -> 
     match element with 
     | Some x -> printfn "Actor %i: OK with result %A" idx x 
     | None -> printfn "Actor %i: Failed" idx) 

И это работает в соответствии с планом.

Однако предположим, что я хотел бы отложить звонок до Receive на более поздний этап.

type TestActor (init) = 
    let mutable counter = init 
    let rcvFun = fun (msg) -> async { 
      match msg with 
      | Add i -> 
       counter <- counter + i 
      | GetCounter reply -> 
       reply.Reply counter} 

    let mailbox = OnlyLatestMBP<TestMessage>() 

    member x.Receive (timeout) =  
     mailbox.Receive (rcvFun, timeout) 

    member x.Post (msg: TestMessage, timeout) = 
     mailbox.Post(msg, timeout) 

    member x.PostAndAsyncReply (replyChannel, timeout) = 
     mailbox.PostAndAsyncReply(replyChannel, timeout) 


let actorsWorkforce = 
    seq { 1 .. 5} 
    |> Seq.map (fun idx -> TestActor(idx)) 

actorsWorkforce |> Seq.iter (fun idx -> idx.Receive (60000)) 

let test = 
    actorsWorkforce 
    |> Seq.map (fun idx -> idx.PostAndAsyncReply ((fun reply -> GetCounter reply), 10000)) 

    |> Async.Parallel 
    |> Async.RunSynchronously 


let result = 
    test 
    |> Array.iteri (fun idx element -> 
     match element with 
     | Some x -> printfn "Actor %i: OK with result %A" idx x 
     | None -> printfn "Actor %i: Failed" idx) 

Этот фрагмент кода компилируется, но не работает. mailbox.Receive имеет тип подписи. Получать: callback:('a -> Async<unit>) * ?timeout:int -> unit, поэтому имело смысл выполнить Receive с Seq.iter. Я подозреваю, что код не работает, потому что actorsWorkforce |> Seq.iter (fun idx -> idx.Receive (60000)) дублирует actorsWorkforce при исполнении.

Это правильно? Как я могу это исправить? Спасибо!

EDIT

Весь код:

open System 
open System.Diagnostics 
open Microsoft.FSharp.Control 
open System.Threading 
open System.Threading.Tasks 
open System.Collections.Concurrent 


//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// 
// OnlyLatest 
//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// 

type Envelope<'a> = Option<DateTime * 'a> 

[<Sealed>] 
type AsyncReplyChannel<'Reply>(replyf : 'Reply -> unit) = 
    member x.Reply(reply) = replyf(reply) 


[<Sealed>] 
type OnlyLatestMBP<'a>() = 

    let mutable currentEnvelope: Envelope<'a> = Envelope<'a>.None 
    let mutable timestampLastPrcsd: DateTime = DateTime.Now 
    let mutable react = Unchecked.defaultof<_> 

    // Msg Box status 
    let mutable isActive = false 
    let mutable defaultTimeout = Timeout.Infinite 

    // Event Messages 
    let awaitMsg = new AutoResetEvent(false) 
    let isActiveMsg = new AutoResetEvent(false) 

    let rec await timeout = async { 
     let thr = Thread.CurrentThread.ManagedThreadId 
     printfn "await on thread %i" thr 
     match currentEnvelope with 
     | Some (timestamp, x) -> 
      if timestamp > timestampLastPrcsd then 
       do! react x 
       timestampLastPrcsd <- timestamp    
       printfn "processed message" 
      currentEnvelope <- Envelope.None 
      awaitMsg.Reset() |> ignore 
      return! await timeout 
     | None -> 
      let! recd = Async.AwaitWaitHandle(awaitMsg, timeout) 
      if recd 
      then return! await timeout  
      else 
       isActive <- false 
       isActiveMsg.Reset() |> ignore 
       printfn ".. no message within timeout, shutting down" } 

    member x.DefaultTimeout 
     with get() = defaultTimeout 
     and set(value) = defaultTimeout <- value 

    member x.Receive (callback, ?timeout) = 
     if not isActive then 
      isActive <- true 
      isActiveMsg.Set() |> ignore 
     let timeout = defaultArg timeout defaultTimeout 
     react <- callback 
     let todo = await timeout 
     Async.Start todo 

    member x.Post (msg, ?timeout) = async { 
     let thr = Thread.CurrentThread.ManagedThreadId 
     printfn "posting on thread %i" thr 
     let timeout = defaultArg timeout defaultTimeout 
     if not isActive then 
      let! recd = Async.AwaitWaitHandle(isActiveMsg, timeout) 
      if recd then 
       currentEnvelope <- Envelope.Some(DateTime.Now, msg) 
       awaitMsg.Set() |> ignore  
       return true 
      else return false 
     else    
      currentEnvelope <- Envelope.Some(DateTime.Now, msg) 
      awaitMsg.Set() |> ignore 
      return true } 

    member x.PostAndAsyncReply (replyChannelMsg, ?timeout) = async { 
     let timeout = defaultArg timeout defaultTimeout 
     let tcs = new TaskCompletionSource<_>() 
     let msg = replyChannelMsg (new AsyncReplyChannel<_> (fun reply -> tcs.SetResult(reply))) 
     let! posted = x.Post (msg,timeout) 
     if posted then 
      match timeout with 
      | Timeout.Infinite -> 
       let! result = Async.FromContinuations (fun (cont, _, _) -> 
        let apply = fun (task: Task<_>) -> cont (task.Result) 
        tcs.Task.ContinueWith(apply) |> ignore) 
       return Some result 
      | _ -> 
       let waithandle = tcs.Task.Wait(timeout) 
       match waithandle with 
       | false -> return None 
       | true -> return Some tcs.Task.Result 
     else return None } 



type TestMessage = 
    | Add of int 
    | GetCounter of AsyncReplyChannel<int> 


type TestActor (init) = 
    let mutable counter = init 
    let rcvFun = fun (msg) -> async { 
      match msg with 
      | Add i -> 
       counter <- counter + i 
      | GetCounter reply -> 
       reply.Reply counter} 


    let mailbox = OnlyLatestMBP<TestMessage>() 
// do printfn "Initializing actors: " 
// do mailbox.Receive (rcvFun, timeout) 

    member x.Receive (timeout) =  
     mailbox.Receive (rcvFun, timeout) 

    member x.Post (msg: TestMessage, timeout) = 
     mailbox.Post(msg, timeout) 

    member x.PostAndAsyncReply (replyChannel, timeout) = 
     mailbox.PostAndAsyncReply(replyChannel, timeout) 




let actorsWorkforce = 
    seq { 1 .. 5} 
    |> Seq.map (fun idx -> TestActor(idx)) 


actorsWorkforce |> Seq.iter (fun actor -> actor.Receive (60000)) 


let test = 
    actorsWorkforce 
    |> Seq.map (fun idx -> idx.PostAndAsyncReply ((fun reply -> GetCounter reply), 10000)) 
    |> Async.Parallel 
    |> Async.RunSynchronously 


let result = 
    test 
    |> Array.iteri (fun idx element -> 
     match element with 
     | Some x -> printfn "Actor %i: OK with result %A" idx x 
     | None -> printfn "Actor %i: Failed" idx) 
+0

Этот код не является полным - определения для 'OnlyLatestMBP' и' TestMessage' отсутствуют. Хотя я предполагаю, что переход на 'let actorsWorkforce() ...' может работать. –

+0

Tnx для изучения этого. Я добавил полный код. Вопрос в основном заключается в том, как применять «Получить» к структуре данных участников. – NoIdeaHowToFixThis

+0

Ничего себе, это код! –

ответ

1

Как изначально подозревал, этот вопрос действительно с: actorsWorkforce |> Seq.iter (fun idx -> idx.Receive (60000))

Проблема была из-за the lazy nature of seq

Я произвел пример суженного минимального кода.

open System 
open System.Diagnostics 
open Microsoft.FSharp.Control 
open System.Threading 
open System.Threading.Tasks 
open System.Collections.Concurrent 

type TestActress (name, timerlength) = 
    let mutable isActive = false 
    let rec myTask() = async { 
     Thread.Sleep (timerlength * 1000) 
     printfn "%s waited : %i" name timerlength 
     return! myTask() } 
    member this.Start() = 
     isActive <- true 
     Async.Start (myTask()) 
    member this.GetStatus() = async { 
     Thread.Sleep (2000) 
     return isActive } 

// One single element, this is easy 
let cameronDiaz = TestActress ("Cameron", 10) 
cameronDiaz.Start() 
let status = cameronDiaz.GetStatus() |> Async.RunSynchronously  

// Async.Parallel receives a seq<Async<'T>> as an input 
// This is why I started off with a seq 
// This piece of code does not work 
let hollywood = 
    [ "Cameron"; "Pamela"; "Natalie"; "Diane" ] 
    |> List.toSeq 
    |> Seq.mapi (fun idx el -> TestActress (el, idx + 10)) 
hollywood |> Seq.iter (fun el -> el.Start()) 
let areTheyWorking = 
    hollywood 
    |> Seq.map (fun el -> el.GetStatus()) 
    |> Async.Parallel 
    |> Async.RunSynchronously 

// Allright, with a list I get the function executed when I expect them to 
let hollywood2 = 
    [ "Cameron"; "Pamela"; "Natalie"; "Diane" ] 
    |> List.mapi (fun idx el -> TestActress (el, idx + 10)) 
hollywood2 |> List.iter (fun el -> el.Start()) 
let areTheyWorking2 = 
    hollywood2 
    |> List.map (fun el -> el.GetStatus()) 
    |> Async.Parallel 
    |> Async.RunSynchronously 

 Смежные вопросы

  • Нет связанных вопросов^_^