Рассмотрим этот код:Scalaz потока скобка закрывается ресурс раннего
import java.time.Instant
import scalaz.concurrent.Task
import scalaz.stream._
class MyResource {
println("resource obtained")
@volatile var v = 1
def read() = v
def close() {
v = 0
println("resource closed")
}
}
val expensiveCalculation = (i: Int, rc: MyResource) => {
Thread.sleep(1000)
val r = rc.read()
println(s"calculating ($i, $r) on ${Thread.currentThread} at ${Instant.now}")
i/r
}
Я хотел бы использовать Process.bracket для управления ресурсом:
val resource = Process.bracket(Task.delay(new MyResource))(r => Process.eval_(Task.now(r.close())))(rc => Process.repeatEval(Task.now(rc)))
И запустить расчет для диапазона номера:
val range = Process.range(0, 5).toSource
val s1 = range.tee(resource)(tee.zip).map {
case (i, rc) => expensiveCalculation(i, rc)
}
println(s"start s1 at ${Instant.now}")
val res1 = s1.runLog.attemptRun
println(s"done at ${Instant.now}: $res1")
до сих пор так хорошо ... Я получаю этот результат:
start s1 at 2016-04-04T13:44:26.450Z
resource obtained
calculating (0, 1) on Thread[main,5,main] at 2016-04-04T13:44:27.565Z
calculating (1, 1) on Thread[main,5,main] at 2016-04-04T13:44:28.573Z
calculating (2, 1) on Thread[main,5,main] at 2016-04-04T13:44:29.575Z
calculating (3, 1) on Thread[main,5,main] at 2016-04-04T13:44:30.577Z
calculating (4, 1) on Thread[main,5,main] at 2016-04-04T13:44:31.578Z
resource closed
done at 2016-04-04T13:44:31.597Z: \/-(Vector(0, 1, 2, 3, 4))
Теперь я хотел бы запустить дорогостоящий расчет параллельно:
val s2 = merge.mergeN(range.zipWith(resource){
case (i, rc) => Process.eval(Task.delay(expensiveCalculation(i, rc)))
})
println(s"start s2 at ${Instant.now}")
val res2 = s2.runLog.attemptRun
println(s"done at ${Instant.now}: $res2")
И я получаю этот результат:
start s2 at 2016-04-04T13:44:31.601Z
resource obtained
resource closed
calculating (0, 0) on Thread[pool-1-thread-3,5,main] at 2016-04-04T13:44:32.717Z
calculating (1, 0) on Thread[pool-1-thread-8,5,main] at 2016-04-04T13:44:32.717Z
calculating (4, 0) on Thread[pool-1-thread-7,5,main] at 2016-04-04T13:44:32.717Z
calculating (2, 0) on Thread[pool-1-thread-2,5,main] at 2016-04-04T13:44:32.717Z
calculating (3, 0) on Thread[pool-1-thread-6,5,main] at 2016-04-04T13:44:32.717Z
done at 2016-04-04T13:44:32.779Z: -\/(java.lang.ArithmeticException:/by zero)
Видимо функция релиз кронштейна называется слишком рано и преждевременно закрывая ресурс. Что я делаю не так? Есть ли способ закрыть ресурс после завершения параллельного вычисления?
Спасибо.