2016-12-30 9 views
3

Я пытаюсь разбить входящий поток Akka байтов (из тела HTTP-запроса, но он также может быть из файла) в несколько файлов определенного размер.Akka stream - Разделение потока ByteString на несколько файлов

Например, если я загружаю файл 10 ГБ, он создаст что-то вроде 10 файлов 1 ГБ. Файлы будут иметь случайные имена. Моя проблема в том, что я не знаю, с чего начать, потому что все ответы и примеры, которые я прочитал, либо хранят весь фрагмент в памяти, либо используют разделитель на основе строки. Кроме того, у меня не может быть «кусков» 1Gb, а затем просто напишите их на диск.

Есть ли простой Способ выполнения такого рода операций? Моя единственная идея - использовать что-то вроде этого http://doc.akka.io/docs/akka/2.4/scala/stream/stream-cookbook.html#Chunking_up_a_stream_of_ByteStrings_into_limited_size_ByteStrings, но преобразуется в нечто вроде FlowShape[ByteString, File], вставляя себя в файл куски, пока не достигнут максимальный размер файла, затем создадим новый файл и т. Д., И потоковое воспроизведение созданных файлов. Что выглядит как зверская идея не используя должным образом Акку~d ..

Заранее спасибо

ответ

6

я часто вернуться к чисто функциональные, не-Акку методы для таких проблем, как это и затем «поднять» эти функции в AKKA конструкций , Под этим я подразумеваю, что я стараюсь использовать только SCALA «материал», а затем попытаться обернуть этот материал внутри Акку позже ...

Создание файла

Начиная с FileOutputStream создания на основе «генерируется случайным образом имена ":

def randomFileNameGenerator : String = ??? //not specified in question 

import java.io.FileOutputStream 

val randomFileOutGenerator :() => FileOutputStream = 
() => new FileOutputStream(randomFileNameGenerator) 

Государственные

Там должен быть какой-то способ хранения„состояние“текущего файла, например, количество байтов уже написано:

case class FileState(byteCount : Int = 0, 
        fileOut : FileOutputStream = randomFileOutGenerator()) 

Файл Запись

Сначала мы определяем, если бы мы прорвать порог максимального размера файла с заданным ByteString:

import akka.util.ByteString 

val isEndOfChunk : (FileState, ByteString, Int) => Boolean = 
    (state, byteString, maxBytes) => 
    state.byteCount + byteString.length > maxBytes 

Мы тогда имеем для записи функции, которая создает новый FileState, если мы увеличили емкость текущего или вернули текущее состояние, если оно все еще ниже емкости:

val closeFileInState : FileState => Unit = 
    (_ : FileState).fileOut.close() 

val getCurrentFileState(FileState, ByteString, Int) => FileState = 
    (state, byteString, maxBytes) => 
    if(isEndOfChunk(maxBytes, state, byteString)) { 
     closeFileInState(state) 
     FileState() 
    } 
    else 
     state 

Единственное, что осталось, чтобы написать в FileOutputStream:

val writeToFileAndReturn(FileState, ByteString) => FileState = 
    (fileState, byteString) => { 
    fileState.fileOut write byteString.toArray 
    fileState copy (byteCount = fileState.byteCount + byteString.size) 
    } 

//the signature ordering will become useful 
def writeToChunkedFile(maxBytes : Int)(fileState : FileState, byteString : ByteString) : FileState =  
    writeToFileAndReturn(getCurrentFileState(maxBytes, fileState, byteString), byteString)  

Fold на любой GenTraversableOnce

В Скале GenTraversableOnce является любая коллекция, параллельно или нет, что имеет . К ним относятся Iterator, Vector, Array, Seq, scala stream, ...Th Окончательный writeToChunkedFile функция идеально соответствует подписи GenTraversableOnce#fold:

val anyIterable : Iterable = ??? 

val finalFileState = anyIterable.fold(FileState())(writetochunkedFile(maxBytes)) 

Один последний свободный конец; последние FileOutputStream также должны быть закрыты. Так как раз будет издавать только, что последний FileState мы можем закрыть, что один:

closeFileInState(finalFileState) 

Akka Streams

Akka Flow получает его fold из FlowOps#fold, которое происходит в соответствии с GenTraversableOnce подпись. Поэтому мы можем «поднять» наши регулярные функции в значениях потока подобно тому, как мы использовали Iterable раз:

import akka.stream.scaladsl.Flow 

def chunkerFlow(maxBytes : Int) : Flow[ByteString, FileState, _] = 
    Flow[ByteString].fold(FileState())(writeToChunkedFile(maxBytes)) 

Хорошая часть об обработке проблемы с регулярными функциями является то, что они могут быть использованы в других асинхронных структур за пределами потоков , например Фьючерсы или актеры. Вы также не нуждаетесь в akka ActorSystem в модульном тестировании, а только в обычных структурах данных языка.

import akka.stream.scaladsl.Sink 
import scala.concurrent.Future 

def byteStringSink(maxBytes : Int) : Sink[ByteString, _] = 
    chunkerFlow(maxBytes) to (Sink foreach closeFileInState) 

Вы можете использовать этот Sink стечь HttpEntity исходя из HttpRequest.

+0

Wow thanks, это действительно подробный ответ! Не было этого. Любая разница между этим подходом с 'fold' и с той же логикой (сохраняющая состояние и создающая вручную выходной поток) внутри' GraphStageLogic'? Например (на основе вашего комментария + предыдущей ссылки, которую я дал) http://pastebin.com/tzLFAmzk? Небольшой плюс заключается в том, что он позволяет передавать файлы сразу после их создания (но код более длинный и более подверженный ошибкам). – Vuzi

+0

@Vuzi Добро пожаловать. Основное различие, которое я видел между моим подходом и «GraphStageLogic» в «реальном мире», тестировалось. С моими функциями все тестирование может быть выполнено без «ActorSystem», «ActorMaterializer» и «ExecutionContext». Если вы поместите свою логику в конструкцию потока akka, вам понадобится вся инфраструктура akka для проверки этой логики. Счастливый взлом. –

1

Вы можете написать собственный график графа. Ваша проблема аналогична проблеме, с которой сталкивается альпакка во время загрузки на амазон S3. (google alpakka s3 connector .. они не позволят мне размещать более двух ссылок)

По какой-то причине соединитель s3 DiskBuffer, однако, записывает весь входящий источник байтов в файл, прежде чем выделять кусок для дальнейшей обработки потока ..

Мы хотим что-то подобное limit a source of byte strings to specific length. В этом примере они ограничивают входящий источник [ByteString, _] источником фиксированных байтов с помощью фиксированного размера, поддерживая буфер памяти. Я принял его для работы с файлами. Преимущество этого в том, что вы можете использовать выделенный пул потоков для этого этапа для блокировки ввода-вывода. Для хорошего реактивного потока вы хотите сохранить блокировку ввода-вывода в отдельном пуле потоков в актерской системе. PS: это не пытается делать файлы точного размера .. так что если мы прочитаем дополнительно 2 КБ в файле 100 МБ, мы напишем эти лишние байты в текущий файл, а не пытаемся достичь точного размера.

import java.io.{FileOutputStream, RandomAccessFile} 
import java.nio.channels.FileChannel 
import java.nio.file.Path 

import akka.stream.stage.{GraphStage, GraphStageLogic, InHandler, OutHandler} 
import akka.stream._ 
import akka.util.ByteString 

case class MultipartUploadChunk(path: Path, size: Int, partNumber: Int) 
//Starts writing the byteStrings received from upstream to a file. Emits a path after writing a partSize number of bytes. Does not attemtp to write exact number of bytes. 
class FileChunker(maxSize: Int, tempDir: Path, partSize: Int) 
    extends GraphStage[FlowShape[ByteString, MultipartUploadChunk]] { 

    assert(maxSize > partSize, "Max size should be larger than part size. ") 

    val in: Inlet[ByteString] = Inlet[ByteString]("PartsMaker.in") 
    val out: Outlet[MultipartUploadChunk] = Outlet[MultipartUploadChunk]("PartsMaker.out") 

    override val shape: FlowShape[ByteString, MultipartUploadChunk] = FlowShape.of(in, out) 

    override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = 
    new GraphStageLogic(shape) with OutHandler with InHandler { 

     var partNumber: Int = 0 
     var length: Int = 0 
     var currentBuffer: Option[PartBuffer] = None 

     override def onPull(): Unit = 
     if (isClosed(in)) { 
      emitPart(currentBuffer, length) 
     } else { 
      pull(in) 
     } 

     override def onPush(): Unit = { 
     val elem = grab(in) 
     length += elem.size 
     val currentPart: PartBuffer = currentBuffer match { 
      case Some(part) => part 
      case None => 
      val newPart = createPart(partNumber) 
      currentBuffer = Some(newPart) 
      newPart 
     } 
     currentPart.fileChannel.write(elem.asByteBuffer) 
     if (length > partSize) { 
      emitPart(currentBuffer, length) 
      //3. .increment part number, reset length. 
      partNumber += 1 
      length = 0 
     } else { 
      pull(in) 
     } 
     } 

     override def onUpstreamFinish(): Unit = 
     if (length > 0) emitPart(currentBuffer, length) // emit part only if something is still left in current buffer. 

     private def emitPart(maybePart: Option[PartBuffer], size: Int): Unit = maybePart match { 
     case Some(part) => 
      //1. flush the part buffer and truncate the file. 
      part.fileChannel.force(false) 
      //   not sure why we do this truncate.. but was being done in alpakka. also maybe safe to do. 
//     val ch = new FileOutputStream(part.path.toFile).getChannel 
//   try { 
//   println(s"truncating to size $size") 
//   ch.truncate(size) 
//   } finally { 
//   ch.close() 
//   } 
      //2emit the part 
      val chunk = MultipartUploadChunk(path = part.path, size = length, partNumber = partNumber) 
      push(out, chunk) 
      part.fileChannel.close() // TODO: probably close elsewhere. 
      currentBuffer = None 
      //complete stage if in is closed. 
      if (isClosed(in)) completeStage() 
     case None => if (isClosed(in)) completeStage() 
     } 

     private def createPart(partNum: Int): PartBuffer = { 
     val path: Path = partFile(partNum) 
     //currentPart.deleteOnExit() //TODO: Enable in prod. requests that the file be deleted when VM dies. 
     PartBuffer(path, new RandomAccessFile(path.toFile, "rw").getChannel) 
     } 

     /** 
     * Creates a file in the temp directory with name bmcs-buffer-part-$partNumber 
     * @param partNumber the part number in multipart upload. 
     * @return 
     * TODO:add unique id to the file name. for multiple 
     */ 
     private def partFile(partNumber: Int): Path = 
     tempDir.resolve(s"bmcs-buffer-part-$partNumber.bin") 
     setHandlers(in, out, this) 
    } 

    case class PartBuffer(path: Path, fileChannel: FileChannel) //TODO: see if you need mapped byte buffer. might be ok with just output stream/channel. 

}