2017-02-06 13 views
4

Я построил график akka, который определяет поток. Моя цель - переформатировать мой будущий ответ и сохранить его в файле. Поток может быть указано ниже:Как вы относитесь к фьючерсам в Akka Flow?

val g = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder: GraphDSL.Builder[NotUsed] => 
     import GraphDSL.Implicits._ 
     val balancer = builder.add(Balance[(HttpRequest, String)](6, waitForAllDownstreams = false)) 
     val merger = builder.add(Merge[Future[Map[String, String]]](6)) 
     val fileSink = FileIO.toPath(outputPath, options) 
     val ignoreSink = Sink.ignore 
     val in = Source(seeds) 
     in ~> balancer.in 
     for (i <- Range(0,6)) { 
     balancer.out(i) ~> 
      wikiFlow.async ~> 
      // This maps to a Future[Map[String, String]] 
      Flow[(Try[HttpResponse], String)].map(parseHtml) ~> 
      merger 
     } 

     merger.out ~> 
     // When we merge we need to map our Map to a file 
     Flow[Future[Map[String, String]]].map((d) => { 
     // What is the proper way of serializing future map 
     // so I can work with it like a normal stream into fileSink? 

     // I could manually do -> 
     // d.foreach(someWriteToFileProcess(_)) 
     // with ignoreSink, but this defeats the nice 
     // akka flow 
     }) ~> 
     fileSink 

     ClosedShape 
    }) 

Я могу взломать этот рабочий процесс, чтобы написать свою будущую карту в файл через Еогеасп, но я боюсь, что это может каким-то образом привести к проблемам с параллелизмом FileIO, и он просто не хорошо чувстовать. Каков надлежащий способ обработки фьючерсов с нашим аккским потоком?

+0

Возможный дубликат [Создание противодавления из будущего внутри потока Акко] (http://stackoverflow.com/questions/39909303/create-backpressure-from-a -future-inside-akka-stream) –

ответ

9

Самый простой способ создать Flow, который включает асинхронное вычисление, - это использовать mapAsync.

Так что ... позволяет сказать, что вы хотите создать Flow, который потребляет Int и производит String с помощью асинхронного вычисления mapper: Int => Future[String] с параллелизмом 5.

val mapper: Int => Future[String] = (i: Int) => Future(i.toString) 

val yourFlow = Flow[Int].mapAsync[String](5)(mapper) 

Теперь вы можете использовать этот поток в вашем графике как вы хотите.

Пример использования будет,

val graph = GraphDSL.create() { implicit builder => 
    import GraphDSL.Implicits._ 

    val intSource = Source(1 to 10) 

    val printSink = Sink.foreach[String](s => println(s)) 

    val yourMapper: Int => Future[String] = (i: Int) => Future(i.toString) 

    val yourFlow = Flow[Int].mapAsync[String](2)(yourMapper) 

    intSource ~> yourFlow ~> printSink 

    ClosedShape 
} 
+0

Я проверил это позже сегодня вечером. Не знал о mapAsync, я не видел его в своем руководстве. Благодаря! –