2017-01-29 15 views
0

Я довольно новичок в потоке Akka, я некоторое время работал с Rx, поэтому знаю довольно хорошо всех операторов, но я не могу понять, почему мой конвейер не испускает значенияПроблемы с потоком в потоке akka

Вот мой код

@Test def mainFlow(): Unit = { 
    val increase = Flow[Int] 
     .map(value => value * 10) 
    val filterFlow = Flow[Int] 
     .filter(value => value > 50) 
     .take(2) 
    Source(0 to 10) 
     .via(increase) 
     .via(filterFlow) 
     .to(Sink.foreach(value => println(s"Item emitted:$value"))) 
     .run() 
    } 

Первый поток преобразования значения излучаемых в исходном умножению на 10, и фильтр второго потока путем получить только пунктов выше, чем 50, а затем я просто получить 2, так Я ожидал иметь в раковине 60 и 70 Но ничего не вышло.

Любая идея, почему?

ответ

3

Ваш поток правильно построен и испускает эти 2 элемента, которые вы упомянули. Я считаю, что проблема связана с вашим тестом. А именно, поток выполняется асинхронно, и ваш тест является простой процедурой Unit. Поэтому тест не будет ждать, пока поток не будет запущен.

Для выполнения ваших утверждений вам необходимо ввести некоторую синхронизацию в своем тесте. Один из способов сделать это - использовать признак ScalaFutures от ScalaTest, который предлагает вам метод futureValue.

val increase = Flow[Int] 
    .map(value => value * 10) 
val filterFlow = Flow[Int] 
    .filter(value => value > 50) 
    .take(2) 
Source(0 to 10) 
    .via(increase) 
    .via(filterFlow) 
    .runForeach(value => println(s"Item emitted:$value")) 
    .futureValue 

Обратите внимание, что .to(Sink.foreach{...}).run() не разоблачить Future[Done] вам нужно синхронизировать. Ваш код должен быть изменен на .toMat(Sink.foreach{...})(Keep.right).run(), который может быть сокращен до .runForeach(...).

+0

Спасибо, я не знал, что когда вы используете поток, трубопровод станет Async – paul

1

Потому что вы говорите, является следующее:

Для чисел 1..10 умножать на 10, но только когда-либо производить 2 первые элементы затем сохранить все эти элементы, которые больше, чем 50, а затем распечатать их.

Кроме того, ваш тест не дожидается завершения RunnableFlow, что обычно означает, что ваша программа выйдет, прежде чем поток сможет запустить (потоки Akka работают асинхронно).

Обратите внимание, что для примера нет оснований использовать GraphDSL, код идентичен:

Source(1 to 10).map(_ * 10).take(2).filter(_ > 50).runForeach(println) 

Но так как он ничего не делает «осмысленно асинхронным» Я думаю, что вы были бы гораздо лучше с:

(1 to 10).map(_ * 10).take(2).filter(_ > 50).foreach(println) 

Но опять же, с текущим состоянием кода, это эквивалентно следующему выражению:

() 
+0

Я изучаю поток Акка, вот почему я использую этот глупый пример. О том, что весь поток Akka работает async, я не понимаю, как этот пример противодавления работает без ожидания https://github.com/politrons/Akka/blob/master/src/main/scala/stream/BackPressure. Скала – paul