2016-06-20 6 views
2

У меня есть процесс трубопровод с 3 этапом:Реактивных расширения: Создание трубопровода с Rx, который работает с файлами

  1. видео к изображениям: У меня есть видео, которое преобразуется в стоп-кадры (фреймы)
  2. Кадры для zip-файла: Когда все кадры в видео обработаны, я должен создать с ними Zip-файл.
  3. зип файл => Загрузить на FTP

Она включает в себя 2: одноразовые видео захвата и почтовый файл.

Как я могу справиться с этим с помощью Rx? Есть идеи? Извините, что не публиковал какой-либо код, я не знаю, с чего начать.

Заранее благодарен!

+0

Начните с разбивки кода на эти три шага. Затем убедитесь, что каждый из этих методов возвращает «IObservable » для представления асинхронного характера перекодирования/записи/загрузки. Наконец, используйте SelectMany/Concat/Merge/etc для создания конвейера Rx, используя эти 3 метода. –

+0

Спасибо, Ли! Дело в том, что я не знаю, действительно ли имеет смысл создать IObservabe , так как эта абстракция на самом деле не представляет данные, а абстракцию для создания .zip. Если мне удастся создать такой вид наблюдения, я получаю поток удаленных ZipArchives. – SuperJMN

+0

В более общем плане, я чувствую, что прогнозы, которые в конечном итоге попадают в файл, в конечном итоге становятся сложными. Возможно, способ сделать это - спроектировать маркер (строка), который представляет имя видео и с помощью Do, для создания Zip и передачи маркера на следующий шаг в конвейере. Наконец, абонент получит токен, чтобы узнать, какой файл загрузить. – SuperJMN

ответ

5

Вам нужно пройти по необработанным объектам с каждого шага. Должно быть хорошо, что захват видео или zip-файл «удалены», потому что я уверен, что будет какое-то побочное действие, связанное с действием (MemoryStream, файл, записанный на диск и т. Д.). Вы можете просто передать указатели (Uri's?) К результатам каждого действия к следующей части конвейера?

void Main() 
{ 
    var video = new Uri("https://www.youtube.com/watch?v=Tp5mRlHwZ7M"); 
    var query = from frames in TranscodeVideoToImages(video) 
     from zipFile in ZipFiles(frames) 
     from uploadLocation in UploadFile(zipFile) 
     select uploadLocation; 
    query.Subscribe(...) 
} 
private IObservable<Uri[]> TranscodeVideoToImages(Uri imageSource) 
{ 
    //Do some long running (async) work here. 
    // Save work to disk 
    // Return location of saved work 
} 
private IObservable<Uri> ZipFiles(Uri[] files) 
{ 
    //Run the zip process on all of the files 
    // Return the location of the zip file 
} 
private IObservable<Uri> UploadFile(Uri source) 
{ 
    //Upload the File. 
    //Probably as simple as as task based operation with .ToObservable() 
}