Я построил график akka, который определяет поток. Моя цель - переформатировать мой будущий ответ и сохранить его в файле. Поток может быть указано ниже:Как вы относитесь к фьючерсам в Akka Flow?
val g = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder: GraphDSL.Builder[NotUsed] =>
import GraphDSL.Implicits._
val balancer = builder.add(Balance[(HttpRequest, String)](6, waitForAllDownstreams = false))
val merger = builder.add(Merge[Future[Map[String, String]]](6))
val fileSink = FileIO.toPath(outputPath, options)
val ignoreSink = Sink.ignore
val in = Source(seeds)
in ~> balancer.in
for (i <- Range(0,6)) {
balancer.out(i) ~>
wikiFlow.async ~>
// This maps to a Future[Map[String, String]]
Flow[(Try[HttpResponse], String)].map(parseHtml) ~>
merger
}
merger.out ~>
// When we merge we need to map our Map to a file
Flow[Future[Map[String, String]]].map((d) => {
// What is the proper way of serializing future map
// so I can work with it like a normal stream into fileSink?
// I could manually do ->
// d.foreach(someWriteToFileProcess(_))
// with ignoreSink, but this defeats the nice
// akka flow
}) ~>
fileSink
ClosedShape
})
Я могу взломать этот рабочий процесс, чтобы написать свою будущую карту в файл через Еогеасп, но я боюсь, что это может каким-то образом привести к проблемам с параллелизмом FileIO, и он просто не хорошо чувстовать. Каков надлежащий способ обработки фьючерсов с нашим аккским потоком?
Возможный дубликат [Создание противодавления из будущего внутри потока Акко] (http://stackoverflow.com/questions/39909303/create-backpressure-from-a -future-inside-akka-stream) –