2014-10-17 2 views
5

В моем случае у меня есть 2 стороны:Akka: тестирование мониторинг смерть смотреть

  1. watchee (я использую TestProbe)
  2. watcher (Watcher заворачивают в TestActorRef выставить некоторые внутренние state я отслеживать в моем тесте)

Наблюдатель должен предпринять некоторые действия, когда watchee умирает.

Вот полный тест я написал до сих пор:

class TempTest(_system: ActorSystem) extends TestKit(_system) with ImplicitSender with FunSuiteLike with Matchers with BeforeAndAfterAll { 

    def this() = this(ActorSystem("TempTest")) 

    override def afterAll { 
    TestKit.shutdownActorSystem(system) 
    } 

    class WatcherActor(watchee: ActorRef) extends Actor { 

    var state = "initial" 
    context.watch(watchee) 

    override def receive: Receive = { 
     case "start" => 
     state = "start" 
     case _: Terminated => 
     state = "terminated" 
    } 

    } 

    test("example") { 
    val watchee = TestProbe() 
    val watcher = TestActorRef[WatcherActor](Props(new WatcherActor(watchee.ref))) 

    assert(watcher.underlyingActor.state === "initial") 

    watcher ! "start" // "start" will be sent and handled by watcher synchronously 
    assert(watcher.underlyingActor.state === "start") 

    system.stop(watchee.ref) // will cause Terminated to be sent and handled asynchronously by watcher 
    Thread.sleep(100) // what is the best way to avoid blocking here? 
    assert(watcher.underlyingActor.state === "terminated") 
    } 

} 

Теперь, поскольку все заинтересованные субъекты используют CallingThreadDispatcher (все тестовые хелперы AKKA ныряет построены с использованием реквизита с .withDispatcher(CallingThreadDispatcher.Id)), я могу с уверенностью предположить, что, когда это утверждение возвращает:

watcher ! "start" 

... «Пуск» сообщение уже обработано WatchingActor и, таким образом, я могу сделать утверждение, основанное на watcher.underlyingActor.state

Однако, основываясь на моих наблюдениях, когда я перестану watchee с помощью system.stop или отправив Kill к нему Terminated сообщению, полученное в качестве побочного эффекта от watchee смерти получает асинхронно, в другом потоке.

Not-a-solution должен остановить watchee, заблокировать поток в течение некоторого времени и подтвердить состояние Watcher после этого, но я хотел бы знать, как правильно это сделать (то есть, как быть уверенным, что после убийства актер его наблюдатель получил и обработалTerminated сообщение сигнализация это смерть)?

ответ

3

EDIT: После обсуждения и тестирования с помощью OP мы обнаружили, что отправка PoisonPill в качестве средства прекращения наблюдаемого актера достигает желаемого поведения, так как обработанные прерывания от PPill обрабатываются синхронно, в то время как обработчики с остановкой или уничтожением обрабатываются асинхронно.

Хотя мы не уверены в этой причине, наша лучшая ставка заключается в том, что убийство актера вызывает исключение, а PPilling это не так.

По-видимому, это не имеет никакого отношения к использованию шаблона gracefulStop в соответствии с моим первоначальным предложением, о котором я расскажу ниже.

Таким образом, решение проблемы OP состояло в том, чтобы остановить наблюдаемого актера, отправляющего PPill вместо этого сообщение Kill или выполнив команду system.stop.


Старый ответ начинается здесь:

я могу идти, чтобы предложить что-то немного не связаны, но я чувствую, что это может применяться.

Если я правильно понял, что вы хотите сделать, это в основном прерывать актера синхронно, т. Е. Делать то, что возвращается только после того, как актер официально умер и его смерть была записана (в вашем случае наблюдателем).

В целом, уведомление о смерти, а также большинство остальных в акке, является асинхронным. Тем не менее, можно получить синхронное подтверждение смерти, используя шаблон gracefulStop (akka.pattern.gracefulStop).

Чтобы сделать это, код должен быть что-то похожее на:

val timeout = 5.seconds 
val killResultFuture = gracefulStop(victimRef, timeout, PoisonPill) 
Await.result(killResultFuture, timeout) 

Что это делает посылает PoisonPill к жертве (примечание: вы можете использовать пользовательское сообщение), который будет отвечать будущим который завершается после смерти жертвы. Используя Await.result, вы гарантированно будете синхронны.

К сожалению, это может использоваться только в том случае, если: a) вы активно убиваете жертву, и вместо этого вы не хотите реагировать на внешнюю причину смерти. B) Вы можете использовать таймауты и блокировку в своем коде. Но, может быть, вы сможете адаптировать этот шаблон к своей ситуации.

+0

Пробовал ваш сниппет, и это сработало. Самое смешное в том, что он страдает почти той же проблемой, что и исходный ответ @ cmbaxter (ожидающий результата «gracefulStop» гарантирует возвращение после «posStop» целевого актера, а не «Terminated», произведенный смертью целевого актера, потребляется) , Я наклонился, пытаясь понять, почему ваша отрезанная работа, и оказалось, что это не «gracefulStop», что заставляет ее работать, а то, что вы отправляете «PoisonPill» для прекращения действия актера. –

+0

По какой-то причине «Terminated», созданный смертью актера от «PoisonPill», обрабатывается синхронно, а те, которые производятся смертью из «Kill» или «system.stop» - асинхронно. Я еще не понял, почему, но в конце концов это означает, что можно просто отправить «PoisonPill» в «watchee» для достижения моей цели (зная, когда актер умер, и все «наблюдатели» были признаны), и что 'gracefulStop' не имеет ничего общего с решением этой проблемы (фактически, используя' gracefulStop' и 'Kill', поскольку сообщение вводит условие гонки, о котором я говорил в комментарии выше). –

+0

Я тоже не уверен. Может быть, это потому, что Kill бросает исключение, а PPill этого не делает? –

5

Один из способов исправить эту проблему - представить еще один наблюдатель в вашем тесте, который также наблюдает за watchee. Этот другой наблюдатель - это TestProbe, который позволит нам выполнить на нем утверждение, которое избавится от проблем, которые вы видите. Во-первых, измененный код теста:

val watchee = TestProbe() 
val watcher = TestActorRef[WatcherActor](Props(new WatcherActor(watchee.ref))) 
val probeWatcher = TestProbe() 
probeWatcher watch watchee.ref 

assert(watcher.underlyingActor.state === "initial") 

watcher ! "start" // "start" will be sent and handled by watcher synchronously 
assert(watcher.underlyingActor.state === "start") 

system.stop(watchee.ref) // will cause Terminated to be sent and handled asynchronously by watcher 
probeWatcher.expectTerminated(watchee.ref) 
assert(watcher.underlyingActor.state === "terminated") 

Таким образом, вы можете видеть, что я ввел дополнительный наблюдатель с линиями:

val probeWatcher = TestProbe() 
probeWatcher watch watchee.ref 

Тогда, в конце кода, перед окончательным утверждением, что неисправного для вас я использую еще одно утверждение, которое позволяет мне знать, что Terminated сообщений для остановленного актера было правильно распределено:

probeWatcher.expectTerminated (watchee.ref)

Когда код перемещается мимо этой строки, я могу быть уверен, что тестируемый watcher также получил сообщение с завершенным сообщением, и последующее утверждение будет проходить.

РЕДАКТИРОВАТЬ

Как было отмечено ОП, существует уровень не-детерминизма с этим кодом.Другим возможным решением является изменение строки в тестовый код, который останавливает актера:

watcher.underlyingActor.context.stop(watchee.ref) 

Используя context в TestActorRef я считаю, что Terminated будет доставлено все через CallingThreadDispatcher и, таким образом, полностью синхронны. Я проверил это в цикле, и это сработало для меня более 1000 итераций.

Теперь я подумал, что, может быть, потому что я выполнял stop, используя тот же актер, что ожидавшая Terminated, что, может быть, там была оптимизация для доставки Terminated к себе для этого scanario, поэтому я проверил это с совершенно другим Actor следующим образом:

class FooActor extends Actor{ 
    def receive = { 
    case _ => 
    } 

Затем в тестовом коде:

val foo = TestActorRef(new FooActor) 

а на остановке:

foo.underlyingActor.context.stop(watchee.ref) 

Это также сработало, как ожидалось.

+0

Я боюсь, что это не решение. Повторное построение теста таким образом, как вы предлагаете, вводит гоночный condiotion (что именно гарантирует, что 'Terminated', отправленный на' probeWatcher', будет обработан после 'Terminated', отправленного' watcher'?). Посмотрите на него, запустив код, который вы предоставили в цикл. –

+0

@EugenyLoy, я обновил свой ответ. – cmbaxter