2016-11-25 5 views
0

У меня поток потоков, цель которых вычислить простой «контрольная сумма» содержимого в наборе .zip-файлов.Observable.Using и async streams поврежденные данные

Чтобы сделать это, я поставил наблюдаемым, что:

  1. принимает все файлы в данной папке
  2. считывает содержимое каждого файла (чтение как ZipArchive)
  3. для каждой записи в каждом файле, выполняет вычисление контрольной суммы

Чтобы проиллюстрировать это, я создал этот экс обильно:

ВНИМАНИЯ использование AsyncContext.Run (https://stackoverflow.com/a/9212343/1025407), чтобы сделать метод Main ждать GetChecksum, так как это консольное приложение

namespace DisposePoC 
{ 
    using System.Collections.Generic; 
    using System.IO; 
    using System.IO.Compression; 
    using System.Reactive.Linq; 
    using Nito.AsyncEx; 
    using System.Linq; 
    using System.Threading.Tasks; 


    class Program 
    { 
     private static void Main() 
     { 
      AsyncContext.Run(GetChecksums); 
     } 

     private static async Task<IList<byte>> GetChecksums() 
     { 
      var bytes = Directory.EnumerateFiles("FolderWithZips") 
       .ToObservable() 
       .SelectMany(path => Observable.Using(() => CreateZipArchive(path), archive => archive.Entries.ToObservable())) 
       .SelectMany(entry => Observable.Using(entry.Open, stream => Observable.FromAsync(() => CalculateChecksum(stream, entry.Length)))); 

      return await bytes.ToList(); 
     } 

     private static ZipArchive CreateZipArchive(string path) 
     { 
      return new ZipArchive(new FileStream(path, FileMode.Open, FileAccess.Read)); 
     } 

     private static async Task<byte> CalculateChecksum(Stream stream, long entryLength) 
     { 
      var bytes = await GetBytesFromStream(stream, entryLength); 
      return bytes.Aggregate((b1, b2) => (byte) (b1^b2)); 
     } 

     private static async Task<byte[]> GetBytesFromStream(Stream stream, long entryLength) 
     { 
      byte[] bytes = new byte[entryLength]; 
      await stream.ReadAsync(bytes, 0, (int)entryLength); 
      return bytes;    
     } 
    } 
} 

Запуска приложения, я получаю все виды ошибок:

'System.IO.InvalidDataException': заголовок локального файла поврежден. 'System.NotSupportedException': поток не поддерживает чтение. 'System.ObjectDisposedException': не удается получить доступ к удаленному объекту. 'System.IO.InvalidDataException': длина блока не соответствует его дополнению.

Что я делаю неправильно?

У меня проблема с наблюдаемым сам по себе, или это потому, что ZipArchive не является потокобезопасным? Если это не так, как мне заставить код работать?

+1

Я сделаю это комментарием, так как я не могу проверить в-коды на данный момент, но я подозреваю, что проблема в том, что ZipArchives создается в первую SelectMany в настоящее время утилизированы в заявлении Использования прежде чем вы сможете прочитать потоки записи в следующей строке - по существу, одноразовая область видимости неверна. Я бы переместил логику из второго SelectMany в первый. Я также хотел бы убедиться, что ваши тестовые данные не повреждены, как указано в первом исключении. – Andrew

+0

Думаю, я вижу вашу точку зрения. Но если область не соответствует действительности, то как мне изменить код, чтобы избежать удаления каждого ZipArchive до тех пор, пока все записи не будут обработаны? Возможно ли это? – SuperJMN

ответ

1

В вашей проблеме нет ничего «Rx».

Если вы моды всего дела в императивном набор петель он работает отлично

private static async Task<IList<byte>> GetChecksums() 
{ 
    var bytes = new List<byte>(); 
    foreach (var path in Directory.EnumerateFiles("FolderWithZips")) 
    { 
     using (var archive = CreateZipArchive(path)) 
     { 
      foreach (var entry in archive.Entries) 
      { 
       using (var stream = entry.Open()) 
       { 
        var checksum = await CalculateChecksum(stream, entry.Length); 
        bytes.Add(checksum); 
       } 
      } 
     } 
    } 

    return bytes; 
} 

Так что я предположил бы, что у вас есть набор условий гонки (параллелизм) и/или из вопросов утилизации заказа.

+0

Я думал, что Observable.Using будет обрабатывать распоряжения потоками в правильном порядке, поэтому я не получу ObjectDisposedExceptions.Я использую это неправильно или это проблема, связанная с природой проблемы? (чтение из ZipArchive одновременно) – SuperJMN

+1

Observable.Using будет распоряжаться ресурсом, созданным предоставленной фабрикой, когда последовательность завершается (dispose/error/complete). Но это все академическое, потому что вы вынуждаете Rx в проблему, которая не является реактивной. Это основная проблема, которую я вижу в периферийных проблемах, - это ненужное введение потоковой передачи, не предоставляя IScheduler для 2 (ненужных) вызовов ToObservable() –

2

Rx, вероятно, не подходит для этого. Честно говоря, вы можете даже сделать это без async.

Directory.EnumerateFiles("FolderWithZips") 
     .AsParallel() 
     .Select(folder => CalculateChecksum(folder)) 
     .ToList() 
+0

Ну, CalculateChecksum является просто примером для упрощения вопроса. В моей реальной жизненной проблеме это асинхронный метод, который я не могу изменить (сторонняя сторона). Как это изменить ваш подход? (будучи асинхронным) – SuperJMN