2017-01-22 12 views
2

Я использую Alpakka-FTP, но, возможно, я ищу общий шаблон акка-потока. Соединитель FTP можно просмотреть список файлов или получить их:Akka Streams, исходные элементы как еще один источник?

def ls(host: String): Source[FtpFile, NotUsed] 
def fromPath(host: String, path: Path): Source[ByteString, Future[IOResult]] 

В идеале я хотел бы создать поток, как это:

LIST 
    .FETCH_ITEM 
    .FOREACH(do something) 

Но я не смог создать такой поток с двумя функциями Я написал выше. Я чувствую, что я должен быть в состоянии получить там, используя Flow, что-то вроде

Ftp.ls 
    .via(some flow that uses the Ftp.fromPath above) 
    .runWith(Sink.foreach(do something)) 

Возможно ли это, учитывая только ls и fromPath функции выше?

EDIT:

Я могу работать его с помощью одного актера и mapAsync, но я все еще чувствую, что это должно быть более простым.

class Downloader extends Actor { 
    override def receive = { 
    case ftpFile: FtpFile => 
     Ftp.fromPath(Paths.get(ftpFile.path), settings) 
     .toMat(FileIO.toPath(Paths.get("testHDF.txt")))(Keep.right) 
     .run() pipeTo sender 
    } 
} 

val downloader = as.actorOf(Props(new Downloader)) 

Ftp.ls("test_path", settings) 
    .mapAsync(1)(ftpFile => (downloader ? ftpFile) (3.seconds).mapTo[IOResult]) 
    .runWith(Sink.foreach(res => println("got it!" + res))) 

ответ

0

Для этой цели вы должны использовать flatMapConcat. Ваш конкретный пример может быть переписан в

Ftp.ls("test_path", settings).flatMapConcat{ ftpFile => 
    Ftp.fromPath(Paths.get(ftpFile.path), settings) 
}.runWith(FileIO.toPath(Paths.get("testHDF.txt"))) 

Документация here.

 Смежные вопросы

  • Нет связанных вопросов^_^