2016-07-05 4 views
1

Я пытаюсь материализовать график внутри актера. Это похоже на работу, если одно из следующих условий:Материализация графика внутри актера

  1. График не содержит эфир (созданный с alsoTo) или
  2. То же ActorMaterializer используется для каждого материализации или
  3. графа материализуются снаружи из Actor

Я сократил его до следующих тестов:

import java.util.concurrent.{CountDownLatch, TimeUnit} 

import akka.NotUsed 
import akka.actor.{Actor, ActorSystem} 
import akka.stream.ActorMaterializer 
import akka.stream.scaladsl.{RunnableGraph, Sink, Source} 
import akka.testkit.{TestActorRef, TestKit} 

import org.scalatest.{FlatSpecLike, Matchers} 

class ActorFlowTest extends TestKit(ActorSystem("ActorFlowTest")) with Matchers with FlatSpecLike { 

    def createGraph(withBroadcast: Boolean) = { 
    if (withBroadcast) Source.empty.alsoTo(Sink.ignore).to(Sink.ignore) 
    else Source.empty.to(Sink.ignore) 
    } 

    case object Bomb 

    class FlowActor(
    graph: RunnableGraph[NotUsed], 
    latch: CountDownLatch, 
    materializer: (ActorSystem) => ActorMaterializer 
) extends Actor { 

    override def preStart(): Unit = { 
     graph.run()(materializer(context.system)) 
     latch.countDown() 
    } 

    override def receive: Receive = { 
     case Bomb => throw new RuntimeException 
    } 
    } 

    "Without an actor" should "be able to materialize twice" in { 
    val graph = Source.empty.alsoTo(Sink.ignore).to(Sink.ignore) 
    val materializer1 = ActorMaterializer()(system) 
    val materializer2 = ActorMaterializer()(system) 
    graph.run()(materializer1) 
    graph.run()(materializer2) // Pass 
    } 

    "With a the same materializer" should "be able to materialize twice" in { 
    val graph = createGraph(withBroadcast = true) 
    val latch = new CountDownLatch(2) 
    val materializer = ActorMaterializer()(system) 
    val actorRef = TestActorRef(new FlowActor(graph, latch, _ => materializer)) 
    verify(actorRef, latch) should be(true) // Pass 
    } 

    "With a new materializer but no broadcast" should "be able to materialize twice" in { 
    val graph = createGraph(withBroadcast = false) 
    val latch = new CountDownLatch(2) 
    def materializer(system: ActorSystem) = ActorMaterializer()(system) 
    val actorRef = TestActorRef(new FlowActor(graph, latch, materializer)) 
    verify(actorRef, latch) should be(true) // Pass 
    } 

    "With a new materializer and a broadcast" should "be able to materialize twice" in { 
    val graph = createGraph(withBroadcast = true) 
    val latch = new CountDownLatch(2) 
    def materializer(system: ActorSystem) = ActorMaterializer()(system) 
    val actorRef = TestActorRef(new FlowActor(graph, latch, materializer)) 
    verify(actorRef, latch) should be(true) // Fail 
    } 

    def verify(actorRef: TestActorRef[_], latch: CountDownLatch): Boolean = { 
    actorRef.start() 
    actorRef ! Bomb 
    latch.await(25, TimeUnit.SECONDS) 
    } 
} 

кажется, что последние случаи всегда таймаут со следующей ошибкой в ​​журнале:

[ERROR] [07/05/2016 16:06:30.625] [ActorFlowTest-akka.actor.default-dispatcher-6] [akka://ActorFlowTest/user/$$c] Futures timed out after [20000 milliseconds] 
akka.actor.PostRestartException: akka://ActorFlowTest/user/$$c: exception post restart (class java.lang.RuntimeException) 
    at akka.actor.dungeon.FaultHandling$$anonfun$6.apply(FaultHandling.scala:250) 
    at akka.actor.dungeon.FaultHandling$$anonfun$6.apply(FaultHandling.scala:248) 
    at akka.actor.dungeon.FaultHandling$$anonfun$handleNonFatalOrInterruptedException$1.applyOrElse(FaultHandling.scala:303) 
    at akka.actor.dungeon.FaultHandling$$anonfun$handleNonFatalOrInterruptedException$1.applyOrElse(FaultHandling.scala:298) 
    at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36) 
    at akka.actor.dungeon.FaultHandling$class.finishRecreate(FaultHandling.scala:248) 
    at akka.actor.dungeon.FaultHandling$class.faultRecreate(FaultHandling.scala:76) 
    at akka.actor.ActorCell.faultRecreate(ActorCell.scala:374) 
    at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:464) 
    at akka.actor.ActorCell.systemInvoke(ActorCell.scala:483) 
    at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:282) 
    at akka.testkit.CallingThreadDispatcher.process$1(CallingThreadDispatcher.scala:243) 
    at akka.testkit.CallingThreadDispatcher.runQueue(CallingThreadDispatcher.scala:283) 
    at akka.testkit.CallingThreadDispatcher.systemDispatch(CallingThreadDispatcher.scala:191) 
    at akka.actor.dungeon.Dispatch$class.restart(Dispatch.scala:119) 
    at akka.actor.ActorCell.restart(ActorCell.scala:374) 
    at akka.actor.LocalActorRef.restart(ActorRef.scala:406) 
    at akka.actor.SupervisorStrategy.restartChild(FaultHandling.scala:365) 
    at akka.actor.OneForOneStrategy.processFailure(FaultHandling.scala:518) 
    at akka.actor.SupervisorStrategy.handleFailure(FaultHandling.scala:303) 
    at akka.actor.dungeon.FaultHandling$class.handleFailure(FaultHandling.scala:263) 
    at akka.actor.ActorCell.handleFailure(ActorCell.scala:374) 
    at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:459) 
    at akka.actor.ActorCell.systemInvoke(ActorCell.scala:483) 
    at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:282) 
    at akka.dispatch.Mailbox.run(Mailbox.scala:223) 
    at akka.dispatch.Mailbox.exec(Mailbox.scala:234) 
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) 
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) 
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 
Caused by: java.util.concurrent.TimeoutException: Futures timed out after [20000 milliseconds] 
    at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) 
    at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) 
    at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:190) 
    at akka.dispatch.MonitorableThreadFactory$AkkaForkJoinWorkerThread$$anon$3.block(ThreadPoolBuilder.scala:167) 
    at scala.concurrent.forkjoin.ForkJoinPool.managedBlock(ForkJoinPool.java:3640) 
    at akka.dispatch.MonitorableThreadFactory$AkkaForkJoinWorkerThread.blockOn(ThreadPoolBuilder.scala:165) 
    at scala.concurrent.Await$.result(package.scala:190) 
    at akka.stream.impl.ActorMaterializerImpl.actorOf(ActorMaterializerImpl.scala:207) 
    at akka.stream.impl.ActorMaterializerImpl$$anon$2.matGraph(ActorMaterializerImpl.scala:166) 
    at akka.stream.impl.ActorMaterializerImpl$$anon$2.materializeAtomic(ActorMaterializerImpl.scala:150) 
    at akka.stream.impl.MaterializerSession$$anonfun$materializeModule$1.apply(StreamLayout.scala:919) 
    at akka.stream.impl.MaterializerSession$$anonfun$materializeModule$1.apply(StreamLayout.scala:915) 
    at scala.collection.immutable.Set$Set1.foreach(Set.scala:94) 
    at akka.stream.impl.MaterializerSession.materializeModule(StreamLayout.scala:915) 
    at akka.stream.impl.MaterializerSession$$anonfun$materializeModule$1.apply(StreamLayout.scala:922) 
    at akka.stream.impl.MaterializerSession$$anonfun$materializeModule$1.apply(StreamLayout.scala:915) 
    at scala.collection.immutable.Set$Set4.foreach(Set.scala:200) 
    at akka.stream.impl.MaterializerSession.materializeModule(StreamLayout.scala:915) 
    at akka.stream.impl.MaterializerSession.materialize(StreamLayout.scala:882) 
    at akka.stream.impl.ActorMaterializerImpl.materialize(ActorMaterializerImpl.scala:182) 
    at akka.stream.impl.ActorMaterializerImpl.materialize(ActorMaterializerImpl.scala:80) 
    at akka.stream.scaladsl.RunnableGraph.run(Flow.scala:351) 
    at ActorFlowTest$FlowActor.preStart(ActorFlowTest.scala:40) 
    at akka.actor.Actor$class.postRestart(Actor.scala:566) 
    at ActorFlowTest$FlowActor.postRestart(ActorFlowTest.scala:33) 
    at akka.actor.Actor$class.aroundPostRestart(Actor.scala:504) 
    at ActorFlowTest$FlowActor.aroundPostRestart(ActorFlowTest.scala:33) 
    at akka.actor.dungeon.FaultHandling$class.finishRecreate(FaultHandling.scala:239) 
    ... 25 more 

Я попытался явно прекращения ActorMaterializers но не воспроизводит эту проблему.

Обойти это создать замыкание вокруг ActorMaterializer в Props, но если это также пришел из другого Actor я волнуюсь, я в конечном итоге получить аналогичные проблемы.

Любая идея, почему это так? Очевидно, что это как-то связано с ActorMaterializer, но интересно, как удаляет Broadcast и решает его (даже с гораздо более сложным графиком).

+0

Можете ли вы попытаться переопределить postRestart и проверить, что там происходит (проверьте входящее исключение и посмотрите, достигает ли код)? Также не забудьте вызвать preStart, когда переопределите postRestart. – thwiegan

+0

«RuntimeException», которое выбрано в 'receive', является« причиной »в' postRestart'. «TimeoutException» вызывается, когда 'postRestart' вызывает' preStart', и когда это вызывает 'graph.run'. – Steiny

ответ

1

Это, по-видимому, связано с (или, по крайней мере, разрешено посредством надлежащего) наблюдения. Я создал дополнительный Supervisor -Actor, который для демонстрации просто начинает сингл FlowActor в своей функции preStart и пересылает Bomb сообщениям. Следующие тесты успешно выполняются без какого-либо исключения таймаута:

import java.util.concurrent.{CountDownLatch, TimeUnit} 

import akka.NotUsed 
import akka.actor.Actor.Receive 
import akka.actor.SupervisorStrategy._ 
import akka.actor.{Actor, ActorRef, ActorSystem, OneForOneStrategy, Props} 
import akka.stream.ActorMaterializer 
import akka.stream.scaladsl.{RunnableGraph, Sink, Source} 
import akka.testkit.{TestActorRef, TestKit} 
import org.scalatest.{FlatSpecLike, Matchers} 

import scala.concurrent.duration._ 

class ActorFlowTest extends TestKit(ActorSystem("TrikloSystem")) with Matchers with FlatSpecLike { 

    def createGraph(withBroadcast: Boolean) = { 
    if (withBroadcast) Source.empty.alsoTo(Sink.ignore).to(Sink.ignore) 
    else Source.empty.to(Sink.ignore) 
    } 

    case object Bomb 

    class Supervisor(graph: RunnableGraph[NotUsed], 
        latch: CountDownLatch, 
        materializer: (ActorSystem) => ActorMaterializer) extends Actor { 

    var actorRef: Option[ActorRef] = None 

    override def preStart(): Unit = { 
     actorRef = Some(context.actorOf(Props(new FlowActor(graph, latch, materializer)))) 
    } 

    override def receive: Receive = { 
     case Bomb => actorRef.map(_ ! Bomb) 
    } 
    } 

    class FlowActor(
        graph: RunnableGraph[NotUsed], 
        latch: CountDownLatch, 
        materializer: (ActorSystem) => ActorMaterializer 
       ) extends Actor { 

    override def preStart(): Unit = { 
     graph.run()(materializer(context.system)) 
     latch.countDown() 
    } 

    override def receive: Receive = { 
     case Bomb => 
     throw new RuntimeException 
    } 
    } 

    "Without an actor" should "be able to materialize twice" in { 
    val graph = Source.empty.alsoTo(Sink.ignore).to(Sink.ignore) 
    val materializer1 = ActorMaterializer()(system) 
    val materializer2 = ActorMaterializer()(system) 
    graph.run()(materializer1) 
    graph.run()(materializer2) // Pass 
    } 

    "With a the same materializer" should "be able to materialize twice" in { 
    val graph = createGraph(withBroadcast = true) 
    val latch = new CountDownLatch(2) 
    val materializer = ActorMaterializer()(system) 
    val actorRef = TestActorRef(new Supervisor(graph, latch, _ => materializer)) 
    verify(actorRef, latch) should be(true) // Pass 
    } 

    "With a new materializer but no broadcast" should "be able to materialize twice" in { 
    val graph = createGraph(withBroadcast = false) 
    val latch = new CountDownLatch(2) 
    def materializer(system: ActorSystem) = ActorMaterializer()(system) 
    val actorRef = TestActorRef(new Supervisor(graph, latch, materializer)) 
    verify(actorRef, latch) should be(true) // Pass 
    } 

    "With a new materializer and a broadcast" should "be able to materialize twice" in { 
    val graph = createGraph(withBroadcast = true) 
    val latch = new CountDownLatch(2) 
    def materializer(system: ActorSystem) = ActorMaterializer()(system) 
    val actorRef = TestActorRef(new Supervisor(graph, latch, materializer)) 
    verify(actorRef, latch) should be(true) // Fail 
    } 

    def verify(actorRef: TestActorRef[_], latch: CountDownLatch): Boolean = { 
    actorRef.start() 
    actorRef ! Bomb 
    latch.await(25, TimeUnit.SECONDS) 
    } 
} 
+0

Это полезно, но, похоже, это только добавляет к списку оговорок. Почему актер-опекун-пользователь должен быть чем-то иным, чем надзиратель здесь? Другими словами, как это (или любое другое предостережение) действительно изменяет поведение? – Steiny

0

В этом тесте есть некоторые неправильные применения Akka TestKit.

TestActorRef - очень специальная тестовая конструкция, в которой она будет выполняться в вызывающем потоке (CallingThreadDispatcher), чтобы обеспечить легкое синхронное модульное тестирование. Использование CountDownLatch в синхронном тесте является странным, поскольку любое действие выполняется в одном потоке, поэтому нет необходимости в межпоточной связи.

Когда вы создаете экземпляр TestActorRef, он запускается в том же самом вызове (вы можете видеть это, например, бросая исключение из конструктора или preStart и видите его в тестовом примере).

Звонок на ActorRef определенно не то, что вы должны делать, специальная природа дает вам доступ к нему, но вы по существу вызываете начало на пустом актере оболочки, а не на актера, с которым, по вашему мнению, вы взаимодействуете (и если бы это был тот актер, то было бы неправильно когда-либо называть его start()).

Надлежащие (но не очень полезно, так как нет проблемы материализации график дважды вне зависимости от контекста или Materializer) испытания того, что вы собираетесь повторить тест будет без защелки и выглядеть примерно так:

class FlowActor(graph: RunnableGraph[NotUsed], materializer: (ActorSystem) => ActorMaterializer) extends Actor { 
    override def preStart(): Unit = { 
    graph.run()(materializer(context.system)) 
    } 
    override def receive: Receive = Actor.emptyBehavior 
} 

"With a new materializer and a broadcast" should "be able to materialize twice" in { 
    val graph = Source.empty.alsoTo(Sink.ignore).to(Sink.ignore) 
    def materializer(system: ActorSystem) = ActorMaterializer()(system) 
    val actorRef1 = TestActorRef(new FlowActor(graph, materializer)) 
    val actorRef2 = TestActorRef(new FlowActor(graph, materializer)) 
    // we'd get an exception here if it was not possible to materialize 
    // since pre-start is run on the calling thread - the same thread 
    // that is executing the test case 
} 

Я бы просто позволил конкретным странностям этого идти вместо того, чтобы углубляться в магию в TestActorRef, это будет трудно заработанные идеи, и они не будут применяться во многих случаях, но это конкретное.