Я пытаюсь материализовать график внутри актера. Это похоже на работу, если одно из следующих условий:Материализация графика внутри актера
- График не содержит эфир (созданный с
alsoTo
) или - То же
ActorMaterializer
используется для каждого материализации или - графа материализуются снаружи из
Actor
Я сократил его до следующих тестов:
import java.util.concurrent.{CountDownLatch, TimeUnit}
import akka.NotUsed
import akka.actor.{Actor, ActorSystem}
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{RunnableGraph, Sink, Source}
import akka.testkit.{TestActorRef, TestKit}
import org.scalatest.{FlatSpecLike, Matchers}
class ActorFlowTest extends TestKit(ActorSystem("ActorFlowTest")) with Matchers with FlatSpecLike {
def createGraph(withBroadcast: Boolean) = {
if (withBroadcast) Source.empty.alsoTo(Sink.ignore).to(Sink.ignore)
else Source.empty.to(Sink.ignore)
}
case object Bomb
class FlowActor(
graph: RunnableGraph[NotUsed],
latch: CountDownLatch,
materializer: (ActorSystem) => ActorMaterializer
) extends Actor {
override def preStart(): Unit = {
graph.run()(materializer(context.system))
latch.countDown()
}
override def receive: Receive = {
case Bomb => throw new RuntimeException
}
}
"Without an actor" should "be able to materialize twice" in {
val graph = Source.empty.alsoTo(Sink.ignore).to(Sink.ignore)
val materializer1 = ActorMaterializer()(system)
val materializer2 = ActorMaterializer()(system)
graph.run()(materializer1)
graph.run()(materializer2) // Pass
}
"With a the same materializer" should "be able to materialize twice" in {
val graph = createGraph(withBroadcast = true)
val latch = new CountDownLatch(2)
val materializer = ActorMaterializer()(system)
val actorRef = TestActorRef(new FlowActor(graph, latch, _ => materializer))
verify(actorRef, latch) should be(true) // Pass
}
"With a new materializer but no broadcast" should "be able to materialize twice" in {
val graph = createGraph(withBroadcast = false)
val latch = new CountDownLatch(2)
def materializer(system: ActorSystem) = ActorMaterializer()(system)
val actorRef = TestActorRef(new FlowActor(graph, latch, materializer))
verify(actorRef, latch) should be(true) // Pass
}
"With a new materializer and a broadcast" should "be able to materialize twice" in {
val graph = createGraph(withBroadcast = true)
val latch = new CountDownLatch(2)
def materializer(system: ActorSystem) = ActorMaterializer()(system)
val actorRef = TestActorRef(new FlowActor(graph, latch, materializer))
verify(actorRef, latch) should be(true) // Fail
}
def verify(actorRef: TestActorRef[_], latch: CountDownLatch): Boolean = {
actorRef.start()
actorRef ! Bomb
latch.await(25, TimeUnit.SECONDS)
}
}
кажется, что последние случаи всегда таймаут со следующей ошибкой в журнале:
[ERROR] [07/05/2016 16:06:30.625] [ActorFlowTest-akka.actor.default-dispatcher-6] [akka://ActorFlowTest/user/$$c] Futures timed out after [20000 milliseconds]
akka.actor.PostRestartException: akka://ActorFlowTest/user/$$c: exception post restart (class java.lang.RuntimeException)
at akka.actor.dungeon.FaultHandling$$anonfun$6.apply(FaultHandling.scala:250)
at akka.actor.dungeon.FaultHandling$$anonfun$6.apply(FaultHandling.scala:248)
at akka.actor.dungeon.FaultHandling$$anonfun$handleNonFatalOrInterruptedException$1.applyOrElse(FaultHandling.scala:303)
at akka.actor.dungeon.FaultHandling$$anonfun$handleNonFatalOrInterruptedException$1.applyOrElse(FaultHandling.scala:298)
at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
at akka.actor.dungeon.FaultHandling$class.finishRecreate(FaultHandling.scala:248)
at akka.actor.dungeon.FaultHandling$class.faultRecreate(FaultHandling.scala:76)
at akka.actor.ActorCell.faultRecreate(ActorCell.scala:374)
at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:464)
at akka.actor.ActorCell.systemInvoke(ActorCell.scala:483)
at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:282)
at akka.testkit.CallingThreadDispatcher.process$1(CallingThreadDispatcher.scala:243)
at akka.testkit.CallingThreadDispatcher.runQueue(CallingThreadDispatcher.scala:283)
at akka.testkit.CallingThreadDispatcher.systemDispatch(CallingThreadDispatcher.scala:191)
at akka.actor.dungeon.Dispatch$class.restart(Dispatch.scala:119)
at akka.actor.ActorCell.restart(ActorCell.scala:374)
at akka.actor.LocalActorRef.restart(ActorRef.scala:406)
at akka.actor.SupervisorStrategy.restartChild(FaultHandling.scala:365)
at akka.actor.OneForOneStrategy.processFailure(FaultHandling.scala:518)
at akka.actor.SupervisorStrategy.handleFailure(FaultHandling.scala:303)
at akka.actor.dungeon.FaultHandling$class.handleFailure(FaultHandling.scala:263)
at akka.actor.ActorCell.handleFailure(ActorCell.scala:374)
at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:459)
at akka.actor.ActorCell.systemInvoke(ActorCell.scala:483)
at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:282)
at akka.dispatch.Mailbox.run(Mailbox.scala:223)
at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.util.concurrent.TimeoutException: Futures timed out after [20000 milliseconds]
at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:190)
at akka.dispatch.MonitorableThreadFactory$AkkaForkJoinWorkerThread$$anon$3.block(ThreadPoolBuilder.scala:167)
at scala.concurrent.forkjoin.ForkJoinPool.managedBlock(ForkJoinPool.java:3640)
at akka.dispatch.MonitorableThreadFactory$AkkaForkJoinWorkerThread.blockOn(ThreadPoolBuilder.scala:165)
at scala.concurrent.Await$.result(package.scala:190)
at akka.stream.impl.ActorMaterializerImpl.actorOf(ActorMaterializerImpl.scala:207)
at akka.stream.impl.ActorMaterializerImpl$$anon$2.matGraph(ActorMaterializerImpl.scala:166)
at akka.stream.impl.ActorMaterializerImpl$$anon$2.materializeAtomic(ActorMaterializerImpl.scala:150)
at akka.stream.impl.MaterializerSession$$anonfun$materializeModule$1.apply(StreamLayout.scala:919)
at akka.stream.impl.MaterializerSession$$anonfun$materializeModule$1.apply(StreamLayout.scala:915)
at scala.collection.immutable.Set$Set1.foreach(Set.scala:94)
at akka.stream.impl.MaterializerSession.materializeModule(StreamLayout.scala:915)
at akka.stream.impl.MaterializerSession$$anonfun$materializeModule$1.apply(StreamLayout.scala:922)
at akka.stream.impl.MaterializerSession$$anonfun$materializeModule$1.apply(StreamLayout.scala:915)
at scala.collection.immutable.Set$Set4.foreach(Set.scala:200)
at akka.stream.impl.MaterializerSession.materializeModule(StreamLayout.scala:915)
at akka.stream.impl.MaterializerSession.materialize(StreamLayout.scala:882)
at akka.stream.impl.ActorMaterializerImpl.materialize(ActorMaterializerImpl.scala:182)
at akka.stream.impl.ActorMaterializerImpl.materialize(ActorMaterializerImpl.scala:80)
at akka.stream.scaladsl.RunnableGraph.run(Flow.scala:351)
at ActorFlowTest$FlowActor.preStart(ActorFlowTest.scala:40)
at akka.actor.Actor$class.postRestart(Actor.scala:566)
at ActorFlowTest$FlowActor.postRestart(ActorFlowTest.scala:33)
at akka.actor.Actor$class.aroundPostRestart(Actor.scala:504)
at ActorFlowTest$FlowActor.aroundPostRestart(ActorFlowTest.scala:33)
at akka.actor.dungeon.FaultHandling$class.finishRecreate(FaultHandling.scala:239)
... 25 more
Я попытался явно прекращения ActorMaterializers
но не воспроизводит эту проблему.
Обойти это создать замыкание вокруг ActorMaterializer
в Props
, но если это также пришел из другого Actor
я волнуюсь, я в конечном итоге получить аналогичные проблемы.
Любая идея, почему это так? Очевидно, что это как-то связано с ActorMaterializer
, но интересно, как удаляет Broadcast и решает его (даже с гораздо более сложным графиком).
Можете ли вы попытаться переопределить postRestart и проверить, что там происходит (проверьте входящее исключение и посмотрите, достигает ли код)? Также не забудьте вызвать preStart, когда переопределите postRestart. – thwiegan
«RuntimeException», которое выбрано в 'receive', является« причиной »в' postRestart'. «TimeoutException» вызывается, когда 'postRestart' вызывает' preStart', и когда это вызывает 'graph.run'. – Steiny