2015-08-11 5 views
1

У меня есть Process[Task, A], и мне нужно запустить функцию A => B которой время работы в диапазоне от мгновенного до очень долго на каждом A потока с получением Process[Task, B].Асинхронный «узел» в потоке scalaz

Уловка заключается в том, что я хотел бы обработать каждый A как можно скорее в ExecutionContext и передать результат, как только у меня есть, независимо от порядка, в котором получены A s.

Конкретным примером может служить следующий код, в котором я бы надеялся, что все нечетные числа будут напечатаны немедленно, а четные - около 500 м. Что происходит вместо этого является то, что (нечетные, четные) пары печатаются, перемежается паузами 500 мс:

import java.util.concurrent.{TimeUnit, Executors} 
import scala.concurrent.ExecutionContext 

import scalaz.stream._ 
import scalaz.concurrent.Task 

object Test extends App { 
    val executor = ExecutionContext.fromExecutorService(Executors.newFixedThreadPool(8)) 

    Process.range(0, 100).flatMap { i => 
    Process.eval(Task.apply { 
     if(i % 2 == 0) Thread.sleep(500) 
     i 
    }(executor)) 
    }.to(io.printStreamSink(System.out)(_ println _)) 
    .run.run 

    executor.shutdown() 
    executor.awaitTermination(10, TimeUnit.MINUTES) 
} 

ответ

1

Оказывается, ответ использует каналы. Вот обновленный код, который, как представляется, делает именно то, что я хочу:

import java.util.concurrent.{TimeUnit, Executors} 
import scala.concurrent.ExecutionContext 

import scalaz.stream._ 
import scalaz.concurrent.Task 

object Test extends App { 
    val executor = ExecutionContext.fromExecutorService(Executors.newFixedThreadPool(8)) 
    val chan = channel.lift[Task, Int, Int] { i => Task { 
    if(i % 2 == 0) Thread.sleep(500) 
    i 
    }} 

    merge.mergeN(8)(Process.range(0, 100).zipWith(chan)((i, f) => Process.eval(f(i)))) 
    .to(io.printStreamSink(System.out)(_ println _)).run.run 

    executor.shutdown() 
    executor.awaitTermination(10, TimeUnit.MINUTES) 
} 
+1

Важным здесь является использование 'mergeN' для оценки нескольких значений' Process', которые испускаются родительским процессом. Но «Канал» не обязательно нужен; если 'chan' имеет тип' Int => Task [Int] ', вы можете использовать его следующим образом:' merge.mergeN (8) (Process.range (0, 100) .map (i => Process.eval (чан (я)))) ' –

 Смежные вопросы

  • Нет связанных вопросов^_^