У меня поток потоков, цель которых вычислить простой «контрольная сумма» содержимого в наборе .zip-файлов.Observable.Using и async streams поврежденные данные
Чтобы сделать это, я поставил наблюдаемым, что:
- принимает все файлы в данной папке
- считывает содержимое каждого файла (чтение как
ZipArchive
) - для каждой записи в каждом файле, выполняет вычисление контрольной суммы
Чтобы проиллюстрировать это, я создал этот экс обильно:
ВНИМАНИЯ использование AsyncContext.Run
(https://stackoverflow.com/a/9212343/1025407), чтобы сделать метод Main
ждать GetChecksum
, так как это консольное приложение
namespace DisposePoC
{
using System.Collections.Generic;
using System.IO;
using System.IO.Compression;
using System.Reactive.Linq;
using Nito.AsyncEx;
using System.Linq;
using System.Threading.Tasks;
class Program
{
private static void Main()
{
AsyncContext.Run(GetChecksums);
}
private static async Task<IList<byte>> GetChecksums()
{
var bytes = Directory.EnumerateFiles("FolderWithZips")
.ToObservable()
.SelectMany(path => Observable.Using(() => CreateZipArchive(path), archive => archive.Entries.ToObservable()))
.SelectMany(entry => Observable.Using(entry.Open, stream => Observable.FromAsync(() => CalculateChecksum(stream, entry.Length))));
return await bytes.ToList();
}
private static ZipArchive CreateZipArchive(string path)
{
return new ZipArchive(new FileStream(path, FileMode.Open, FileAccess.Read));
}
private static async Task<byte> CalculateChecksum(Stream stream, long entryLength)
{
var bytes = await GetBytesFromStream(stream, entryLength);
return bytes.Aggregate((b1, b2) => (byte) (b1^b2));
}
private static async Task<byte[]> GetBytesFromStream(Stream stream, long entryLength)
{
byte[] bytes = new byte[entryLength];
await stream.ReadAsync(bytes, 0, (int)entryLength);
return bytes;
}
}
}
Запуска приложения, я получаю все виды ошибок:
'System.IO.InvalidDataException': заголовок локального файла поврежден. 'System.NotSupportedException': поток не поддерживает чтение. 'System.ObjectDisposedException': не удается получить доступ к удаленному объекту. 'System.IO.InvalidDataException': длина блока не соответствует его дополнению.
Что я делаю неправильно?
У меня проблема с наблюдаемым сам по себе, или это потому, что ZipArchive
не является потокобезопасным? Если это не так, как мне заставить код работать?
Я сделаю это комментарием, так как я не могу проверить в-коды на данный момент, но я подозреваю, что проблема в том, что ZipArchives создается в первую SelectMany в настоящее время утилизированы в заявлении Использования прежде чем вы сможете прочитать потоки записи в следующей строке - по существу, одноразовая область видимости неверна. Я бы переместил логику из второго SelectMany в первый. Я также хотел бы убедиться, что ваши тестовые данные не повреждены, как указано в первом исключении. – Andrew
Думаю, я вижу вашу точку зрения. Но если область не соответствует действительности, то как мне изменить код, чтобы избежать удаления каждого ZipArchive до тех пор, пока все записи не будут обработаны? Возможно ли это? – SuperJMN