2016-08-03 3 views
3

Я не могу понять, как немедленно остановить поток akka Runnable Graph? Как использовать killswitch для достижения этого? Прошло всего несколько дней, когда я начал аккские потоки. В моем случае я читаю строки из файла и делаю некоторые операции в потоке и записи в раковину. Я хочу сделать это, прекратить чтение файла сразу, когда захочу, и я надеюсь, что это должно остановить рабочий график. Любые идеи по этому поводу будут очень признательны.Как внезапно остановить поток akka Runnable Graph?

Заранее спасибо.

ответ

2

Поскольку Akka Streams 2.4.3, есть элегантный способ остановить поток извне через KillSwitch.

Рассмотрим следующий пример, который останавливает поток через 10 секунд.

object ExampleStopStream extends App { 

    implicit val system = ActorSystem("streams") 
    implicit val materializer = ActorMaterializer() 

    import system.dispatcher 

    val source = Source. 
    fromIterator(() => Iterator.continually(Random.nextInt(100))). 
    delay(500.millis, DelayOverflowStrategy.dropHead) 
    val square = Flow[Int].map(x => x * x) 
    val sink = Sink.foreach(println) 

    val (killSwitch, done) = 
    source.via(square). 
    viaMat(KillSwitches.single)(Keep.right). 
    toMat(sink)(Keep.both).run() 

    system.scheduler.scheduleOnce(10.seconds) { 
    println("Shutting down...") 
    killSwitch.shutdown() 
    } 

    done.foreach { _ => 
    println("I'm done") 
    Await.result(system.terminate(), 1.seconds) 
    } 

} 
+0

Смотрите пример кода ниже: Как остановить следующий график, и отменить чтение большого файла? RunnableGraph.fromGraph (GraphDSL.create() { неявный конструктор => импорт GraphDSL.Implicits._ // Источник из файла вал A: Выход [String] = builder.add (readFileLineByLine) .out // записывает каждый строку в файл VAL E:. Впускной [String] = builder.add (writeLinesToFile) .в А ~> E ClosedShape }) запустить() – PainPoints

+0

вы предоставляете пример таким образом, что вы знаете, когда чтобы остановить систему, прежде чем вы начнете, но в моем случае мне нужно остановить запущенную систему в любое время. – PainPoints

+0

я вижу, из теста Акки потока я иду Tthis работу: вала Switch1 = KillSwitches.shared ("переключатель") VAL вниз по течению = RunnableGraph.fromGraph (GraphDSL.create() { неявного конструктор => импорта GraphDSL. Implicits._ // Источник из файла валей A: Выход [String] = builder.add (readFileLineByLine) .out // писать друг строку в файл вал E: Вход [String] = builder.add (writeLinesToFile) .in A.via (switch1.flow) ~> E Закрытая форма }). run() switch1.shutdown() – PainPoints

1

Один из способов иметь услугу или shutdownhookup, которые могут вызвать графа сократимое

val graph= 
    Source.tick(FiniteDuration(0,TimeUnit.SECONDS), FiniteDuration(1,TimeUnit.SECONDS), Random.nextInt).to(Sink.foreach(println)) 
    val cancellable=graph.run() 

    cancellable.cancel 

cancellable.cancel может быть частью ActorSystem.registerOnTermination

 Смежные вопросы

  • Нет связанных вопросов^_^