2015-11-05 6 views
0

Как подключить Source[String, Unit] к потоковому актеру?Поток csv-файла в браузер с использованием потока akka и спрей

Я думаю, что модифицированная версия StreamingActor от https://gist.github.com/whysoserious/96050c6b4bd5fedb6e33 будет работать хорошо, но у меня возникли трудности с подключением частей.

Учитывая, что source: Source[String, Unit] и ctx: RequestContext, я думаю, что модифицированный StreamingActor должен подключаться к actorRefFactory.actorOf(fromSource(source, ctx)).

Для справки, суть выше:

import akka.actor._ 
import akka.util.ByteString 
import spray.http.HttpEntity.Empty 
import spray.http.MediaTypes._ 
import spray.http._ 
import spray.routing.{HttpService, RequestContext, SimpleRoutingApp} 

object StreamingActor { 

    // helper methods 

    def fromString(iterable: Iterable[String], ctx: RequestContext): Props = { 
    fromHttpData(iterable.map(HttpData.apply), ctx) 
    } 
    def fromStringAndCharset(iterable: Iterable[String], ctx: RequestContext, charset: HttpCharset): Props = { 
    fromHttpData(iterable.map(HttpData.apply), ctx) 
    } 
    def fromByteArray(iterable: Iterable[Array[Byte]], ctx: RequestContext): Props = { 
    fromHttpData(iterable.map(HttpData.apply), ctx) 
    } 
    def fromByteString(iterable: Iterable[ByteString], ctx: RequestContext): Props = { 
    fromHttpData(iterable.map(HttpData.apply), ctx) 
    } 
    def fromHttpData(iterable: Iterable[HttpData], ctx: RequestContext): Props = { 
    Props(new StreamingActor(iterable, ctx)) 
    } 

    // initial message sent by StreamingActor to itself 
    private case object FirstChunk 

    // confirmation that given chunk was sent to client 
    private case object ChunkAck 

} 

class StreamingActor(chunks: Iterable[HttpData], ctx: RequestContext) extends Actor with HttpService with ActorLogging { 

    import StreamingActor._ 

    def actorRefFactory = context 

    val chunkIterator: Iterator[HttpData] = chunks.iterator 

    self ! FirstChunk 

    def receive = { 

    // send first chunk to client 
    case FirstChunk if chunkIterator.hasNext => 
     val responseStart = HttpResponse(entity = HttpEntity(`text/html`, chunkIterator.next())) 
     ctx.responder ! ChunkedResponseStart(responseStart).withAck(ChunkAck) 

    // data stream is empty. Respond with Content-Length: 0 and stop 
    case FirstChunk => 
     ctx.responder ! HttpResponse(entity = Empty) 
     context.stop(self) 

    // send next chunk to client 
    case ChunkAck if chunkIterator.hasNext => 
     val nextChunk = MessageChunk(chunkIterator.next()) 
     ctx.responder ! nextChunk.withAck(ChunkAck) 

    // all chunks were sent. stop. 
    case ChunkAck => 
     ctx.responder ! ChunkedMessageEnd 
     context.stop(self) 

    // 
    case x => unhandled(x) 
    } 

} 
+0

Использует ли актер для отправки нескольких значений HttpResponse? Я думаю, вам будет гораздо лучше, если отправить 1 HttpResponse с HttpEntity.Chunked, см. Http://stackoverflow.com/questions/33123280/akka-http-send-continuous-chunked-http-response-stream –

ответ

2

Я думаю, что ваше использование StreamingActor чрезмерно усложняет основную проблему, которую вы пытаетесь решить. Кроме того, StreamingActor в вопросе будет производить несколько значений HttpResponse, 1 для каждого Chunk, для одного HttpRequest. Это неэффективно, потому что вы можете просто вернуть 1 HttpReponse с HttpEntity.Chunked в качестве объекта для источника данных.

Общие Параллелизм Дизайн

Актеры для государства, например, поддерживая рабочий счетчик между соединениями. И даже тогда Agent покрывает много земли с дополнительным преимуществом проверки типа (в отличие от Actor.receive, который превращает почтовый ящик мертвой буквы в ваш единственный тип проверки во время выполнения).

Параллельное вычисление, а не состояние, следует обращаться (в порядке убывания):

  1. фьючерсов в качестве первого рассмотрения: компонуемы, компилировать тип времени безопасной проверки, и лучший выбор для большинства случаев.

  2. akka Потоки: сгруппированы, скомпилированы в режиме безопасного времени и очень полезны, но есть много overhead, что является результатом удобной функции противодавления. Пары также представляют собой то, как формируются объекты HttpResponse, как показано ниже.

Streaming CSV файлов

Вы Основополагающий вопрос заключается в том, чтобы поток файл CSV для клиента HTTP с использованием потоков. Вы можете начать с создания источника данных и встраивание его в пределах HttpResponse:

def lines() = scala.io.Source.fromFile("DataFile.csv").getLines() 

import akka.util.ByteString 
import akka.http.model.HttpEntity 

def chunkSource : Source[HttpEntity.ChunkStreamPart, Unit] = 
    akka.stream.scaladsl.Source(lines) 
         .map(ByteString.apply) 
         .map(HttpEntity.ChunkStreamPart.apply) 

def httpFileResponse = 
    HttpResponse(entity = HttpEntity.Chunked(ContentTypes.`text/plain`, chunkSource)) 

Вы можете предоставить этот ответ для любых запросов:

val fileRequestHandler = { 
    case HttpRequest(GET, Uri.Path("/csvFile"), _, _, _) => httpFileResponse 
} 

Затем встроить fileRequestHandler в вашу логику маршрутизации сервера.