я часто вернуться к чисто функциональные, не-Акку методы для таких проблем, как это и затем «поднять» эти функции в AKKA конструкций , Под этим я подразумеваю, что я стараюсь использовать только SCALA «материал», а затем попытаться обернуть этот материал внутри Акку позже ...
Создание файла
Начиная с FileOutputStream
создания на основе «генерируется случайным образом имена ":
def randomFileNameGenerator : String = ??? //not specified in question
import java.io.FileOutputStream
val randomFileOutGenerator :() => FileOutputStream =
() => new FileOutputStream(randomFileNameGenerator)
Государственные
Там должен быть какой-то способ хранения„состояние“текущего файла, например, количество байтов уже написано:
case class FileState(byteCount : Int = 0,
fileOut : FileOutputStream = randomFileOutGenerator())
Файл Запись
Сначала мы определяем, если бы мы прорвать порог максимального размера файла с заданным ByteString
:
import akka.util.ByteString
val isEndOfChunk : (FileState, ByteString, Int) => Boolean =
(state, byteString, maxBytes) =>
state.byteCount + byteString.length > maxBytes
Мы тогда имеем для записи функции, которая создает новый FileState
, если мы увеличили емкость текущего или вернули текущее состояние, если оно все еще ниже емкости:
val closeFileInState : FileState => Unit =
(_ : FileState).fileOut.close()
val getCurrentFileState(FileState, ByteString, Int) => FileState =
(state, byteString, maxBytes) =>
if(isEndOfChunk(maxBytes, state, byteString)) {
closeFileInState(state)
FileState()
}
else
state
Единственное, что осталось, чтобы написать в FileOutputStream
:
val writeToFileAndReturn(FileState, ByteString) => FileState =
(fileState, byteString) => {
fileState.fileOut write byteString.toArray
fileState copy (byteCount = fileState.byteCount + byteString.size)
}
//the signature ordering will become useful
def writeToChunkedFile(maxBytes : Int)(fileState : FileState, byteString : ByteString) : FileState =
writeToFileAndReturn(getCurrentFileState(maxBytes, fileState, byteString), byteString)
Fold на любой GenTraversableOnce
В Скале GenTraversableOnce
является любая коллекция, параллельно или нет, что имеет . К ним относятся Iterator, Vector, Array, Seq, scala stream, ...Th Окончательный writeToChunkedFile
функция идеально соответствует подписи GenTraversableOnce#fold:
val anyIterable : Iterable = ???
val finalFileState = anyIterable.fold(FileState())(writetochunkedFile(maxBytes))
Один последний свободный конец; последние FileOutputStream
также должны быть закрыты. Так как раз будет издавать только, что последний FileState
мы можем закрыть, что один:
closeFileInState(finalFileState)
Akka Streams
Akka Flow получает его fold
из FlowOps#fold, которое происходит в соответствии с GenTraversableOnce
подпись. Поэтому мы можем «поднять» наши регулярные функции в значениях потока подобно тому, как мы использовали Iterable
раз:
import akka.stream.scaladsl.Flow
def chunkerFlow(maxBytes : Int) : Flow[ByteString, FileState, _] =
Flow[ByteString].fold(FileState())(writeToChunkedFile(maxBytes))
Хорошая часть об обработке проблемы с регулярными функциями является то, что они могут быть использованы в других асинхронных структур за пределами потоков , например Фьючерсы или актеры. Вы также не нуждаетесь в akka ActorSystem
в модульном тестировании, а только в обычных структурах данных языка.
import akka.stream.scaladsl.Sink
import scala.concurrent.Future
def byteStringSink(maxBytes : Int) : Sink[ByteString, _] =
chunkerFlow(maxBytes) to (Sink foreach closeFileInState)
Вы можете использовать этот Sink
стечь HttpEntity
исходя из HttpRequest
.
Wow thanks, это действительно подробный ответ! Не было этого. Любая разница между этим подходом с 'fold' и с той же логикой (сохраняющая состояние и создающая вручную выходной поток) внутри' GraphStageLogic'? Например (на основе вашего комментария + предыдущей ссылки, которую я дал) http://pastebin.com/tzLFAmzk? Небольшой плюс заключается в том, что он позволяет передавать файлы сразу после их создания (но код более длинный и более подверженный ошибкам). – Vuzi
@Vuzi Добро пожаловать. Основное различие, которое я видел между моим подходом и «GraphStageLogic» в «реальном мире», тестировалось. С моими функциями все тестирование может быть выполнено без «ActorSystem», «ActorMaterializer» и «ExecutionContext». Если вы поместите свою логику в конструкцию потока akka, вам понадобится вся инфраструктура akka для проверки этой логики. Счастливый взлом. –