Как подключить Source[String, Unit]
к потоковому актеру?Поток csv-файла в браузер с использованием потока akka и спрей
Я думаю, что модифицированная версия StreamingActor
от https://gist.github.com/whysoserious/96050c6b4bd5fedb6e33 будет работать хорошо, но у меня возникли трудности с подключением частей.
Учитывая, что source: Source[String, Unit]
и ctx: RequestContext
, я думаю, что модифицированный StreamingActor
должен подключаться к actorRefFactory.actorOf(fromSource(source, ctx))
.
Для справки, суть выше:
import akka.actor._
import akka.util.ByteString
import spray.http.HttpEntity.Empty
import spray.http.MediaTypes._
import spray.http._
import spray.routing.{HttpService, RequestContext, SimpleRoutingApp}
object StreamingActor {
// helper methods
def fromString(iterable: Iterable[String], ctx: RequestContext): Props = {
fromHttpData(iterable.map(HttpData.apply), ctx)
}
def fromStringAndCharset(iterable: Iterable[String], ctx: RequestContext, charset: HttpCharset): Props = {
fromHttpData(iterable.map(HttpData.apply), ctx)
}
def fromByteArray(iterable: Iterable[Array[Byte]], ctx: RequestContext): Props = {
fromHttpData(iterable.map(HttpData.apply), ctx)
}
def fromByteString(iterable: Iterable[ByteString], ctx: RequestContext): Props = {
fromHttpData(iterable.map(HttpData.apply), ctx)
}
def fromHttpData(iterable: Iterable[HttpData], ctx: RequestContext): Props = {
Props(new StreamingActor(iterable, ctx))
}
// initial message sent by StreamingActor to itself
private case object FirstChunk
// confirmation that given chunk was sent to client
private case object ChunkAck
}
class StreamingActor(chunks: Iterable[HttpData], ctx: RequestContext) extends Actor with HttpService with ActorLogging {
import StreamingActor._
def actorRefFactory = context
val chunkIterator: Iterator[HttpData] = chunks.iterator
self ! FirstChunk
def receive = {
// send first chunk to client
case FirstChunk if chunkIterator.hasNext =>
val responseStart = HttpResponse(entity = HttpEntity(`text/html`, chunkIterator.next()))
ctx.responder ! ChunkedResponseStart(responseStart).withAck(ChunkAck)
// data stream is empty. Respond with Content-Length: 0 and stop
case FirstChunk =>
ctx.responder ! HttpResponse(entity = Empty)
context.stop(self)
// send next chunk to client
case ChunkAck if chunkIterator.hasNext =>
val nextChunk = MessageChunk(chunkIterator.next())
ctx.responder ! nextChunk.withAck(ChunkAck)
// all chunks were sent. stop.
case ChunkAck =>
ctx.responder ! ChunkedMessageEnd
context.stop(self)
//
case x => unhandled(x)
}
}
Использует ли актер для отправки нескольких значений HttpResponse? Я думаю, вам будет гораздо лучше, если отправить 1 HttpResponse с HttpEntity.Chunked, см. Http://stackoverflow.com/questions/33123280/akka-http-send-continuous-chunked-http-response-stream –