У меня есть Process[Task, A]
, и мне нужно запустить функцию A => B
которой время работы в диапазоне от мгновенного до очень долго на каждом A
потока с получением Process[Task, B]
.Асинхронный «узел» в потоке scalaz
Уловка заключается в том, что я хотел бы обработать каждый A
как можно скорее в ExecutionContext
и передать результат, как только у меня есть, независимо от порядка, в котором получены A
s.
Конкретным примером может служить следующий код, в котором я бы надеялся, что все нечетные числа будут напечатаны немедленно, а четные - около 500 м. Что происходит вместо этого является то, что (нечетные, четные) пары печатаются, перемежается паузами 500 мс:
import java.util.concurrent.{TimeUnit, Executors}
import scala.concurrent.ExecutionContext
import scalaz.stream._
import scalaz.concurrent.Task
object Test extends App {
val executor = ExecutionContext.fromExecutorService(Executors.newFixedThreadPool(8))
Process.range(0, 100).flatMap { i =>
Process.eval(Task.apply {
if(i % 2 == 0) Thread.sleep(500)
i
}(executor))
}.to(io.printStreamSink(System.out)(_ println _))
.run.run
executor.shutdown()
executor.awaitTermination(10, TimeUnit.MINUTES)
}
Важным здесь является использование 'mergeN' для оценки нескольких значений' Process', которые испускаются родительским процессом. Но «Канал» не обязательно нужен; если 'chan' имеет тип' Int => Task [Int] ', вы можете использовать его следующим образом:' merge.mergeN (8) (Process.range (0, 100) .map (i => Process.eval (чан (я)))) ' –