2016-04-04 7 views
0

Рассмотрим этот код:Scalaz потока скобка закрывается ресурс раннего

import java.time.Instant 
import scalaz.concurrent.Task 
import scalaz.stream._ 

    class MyResource { 

    println("resource obtained") 

    @volatile var v = 1 

    def read() = v 

    def close() { 
     v = 0 
     println("resource closed") 
    } 
    } 

    val expensiveCalculation = (i: Int, rc: MyResource) => { 
    Thread.sleep(1000) 
    val r = rc.read() 
    println(s"calculating ($i, $r) on ${Thread.currentThread} at ${Instant.now}") 
    i/r 
    } 

Я хотел бы использовать Process.bracket для управления ресурсом:

val resource = Process.bracket(Task.delay(new MyResource))(r => Process.eval_(Task.now(r.close())))(rc => Process.repeatEval(Task.now(rc))) 

И запустить расчет для диапазона номера:

val range = Process.range(0, 5).toSource 

val s1 = range.tee(resource)(tee.zip).map { 
    case (i, rc) => expensiveCalculation(i, rc) 
} 

println(s"start s1 at ${Instant.now}") 
val res1 = s1.runLog.attemptRun 
println(s"done at ${Instant.now}: $res1") 

до сих пор так хорошо ... Я получаю этот результат:

start s1 at 2016-04-04T13:44:26.450Z 
resource obtained 
calculating (0, 1) on Thread[main,5,main] at 2016-04-04T13:44:27.565Z 
calculating (1, 1) on Thread[main,5,main] at 2016-04-04T13:44:28.573Z 
calculating (2, 1) on Thread[main,5,main] at 2016-04-04T13:44:29.575Z 
calculating (3, 1) on Thread[main,5,main] at 2016-04-04T13:44:30.577Z 
calculating (4, 1) on Thread[main,5,main] at 2016-04-04T13:44:31.578Z 
resource closed 
done at 2016-04-04T13:44:31.597Z: \/-(Vector(0, 1, 2, 3, 4)) 

Теперь я хотел бы запустить дорогостоящий расчет параллельно:

val s2 = merge.mergeN(range.zipWith(resource){ 
    case (i, rc) => Process.eval(Task.delay(expensiveCalculation(i, rc))) 
}) 

println(s"start s2 at ${Instant.now}") 
val res2 = s2.runLog.attemptRun 
println(s"done at ${Instant.now}: $res2") 

И я получаю этот результат:

start s2 at 2016-04-04T13:44:31.601Z 
resource obtained 
resource closed 
calculating (0, 0) on Thread[pool-1-thread-3,5,main] at 2016-04-04T13:44:32.717Z 
calculating (1, 0) on Thread[pool-1-thread-8,5,main] at 2016-04-04T13:44:32.717Z 
calculating (4, 0) on Thread[pool-1-thread-7,5,main] at 2016-04-04T13:44:32.717Z 
calculating (2, 0) on Thread[pool-1-thread-2,5,main] at 2016-04-04T13:44:32.717Z 
calculating (3, 0) on Thread[pool-1-thread-6,5,main] at 2016-04-04T13:44:32.717Z 
done at 2016-04-04T13:44:32.779Z: -\/(java.lang.ArithmeticException:/by zero) 

Видимо функция релиз кронштейна называется слишком рано и преждевременно закрывая ресурс. Что я делаю не так? Есть ли способ закрыть ресурс после завершения параллельного вычисления?

Спасибо.

ответ

0

После прочтения еще несколько статей и экспериментировать с scalaz потоком я пришел к этому решению о том, чтобы освободить ресурс после параллельного выполнения:

val p: Process[Task, Int] = Process.await(Task.delay(new MyResource)) { rc => 

    val range = Process.range(0, 5).toSource 

    merge.mergeN(range.map { i => 
     Process.eval(Task.delay(expensiveCalculation(i, rc))) 
    }).onComplete(Process eval_ Task.delay { 
     rc.close() 
    }) 
} 

p.runLog.attemptRun 

 Смежные вопросы

  • Нет связанных вопросов^_^