Мне нужно обработать List<T>
из тысяч элементов.C# Split List <T> в группы с использованием TPL Parallel ForEach
Прежде всего, мне нужно сгруппировать элементы по годам и типам, поэтому я получаю List<List<T>>
. Затем для каждого внутреннего List<T>
я хочу добавить объекты типа T до тех пор, пока не будет достигнут максимальный размер пакета для List<T>
, тогда я создам новый пакет и продолжаю то же самое.
Я хочу использовать Parallel.ForEach
петлю.
Моя фактическая реализация работает хорошо, если я запускаю ее последовательно, но логика не является потоковой безопасностью, и я хочу ее изменить.
Я думаю, что проблема заключается в внутреннем цикле Parallel.ForEach
, когда достигнут максимальный размер для List<T>
, и я создаю новый List<T>
внутри той же ссылки.
private ConcurrentBag<ConcurrentBag<DumpDocument>> InitializePackages()
{
// Group by Type and Year
ConcurrentBag<ConcurrentBag<DumpDocument>> groups = new ConcurrentBag<ConcurrentBag<DumpDocument>>(Dump.DumpDocuments.GroupBy(d => new { d.Type, d.Year })
.Select(g => new ConcurrentBag<DumpDocument> (g.ToList()))
.ToList());
// Documents lists with max package dimension
ConcurrentBag<ConcurrentBag<DumpDocument>> documentGroups = new ConcurrentBag<ConcurrentBag<DumpDocument>>();
foreach (ConcurrentBag<DumpDocument> group in groups)
{
long currentPackageSize = 0;
ConcurrentBag<DumpDocument> documentGroup = new ConcurrentBag<DumpDocument>();
ParallelOptions options = new ParallelOptions { MaxDegreeOfParallelism = Parameters.MaxDegreeOfParallelism };
Parallel.ForEach(group, options, new Action<DumpDocument, ParallelLoopState>((DumpDocument document, ParallelLoopState state) =>
{
long currentDocumentSize = new FileInfo(document.FilePath).Length;
// If MaxPackageSize = 0 then no splitting to apply and the process works well
if (Parameters.MaxPackageSize > 0 && currentPackageSize + currentDocumentSize > Parameters.MaxPackageSize)
{
documentGroups.Add(documentGroup);
// Here's the problem!
documentGroup = new ConcurrentBag<DumpDocument>();
currentPackageSize = 0;
}
documentGroup.Add(document);
currentPackageSize += currentDocumentSize;
}));
if (documentGroup.Count > 0)
documentGroups.Add(documentGroup);
}
return documentGroups;
}
public class DumpDocument
{
public string Id { get; set; }
public long Type { get; set; }
public string MimeType { get; set; }
public int Year { get; set; }
public string FilePath { get; set; }
}
Поскольку моя операция довольно проста, на самом деле мне нужно только, чтобы получить размер файла с помощью:
long currentDocumentSize = new FileInfo(document.FilePath).Length;
Я читал вокруг, что я могу также использовать Partitioner
, но я никогда не использовал, что и в любом случае это не мой приоритет на данный момент.
Я тоже уже читал это question, что похоже, но не решает мою проблему с внутренним циклом.
UPDATE 28/12/2016
Я обновил код, чтобы соответствовать требованиям проверки.
Вы просто пытаетесь ускорить процесс параллелизма? – Enigmativity
В этом конкретном случае да. Я хочу ускорить инициализацию пакетов. Затем каждый отдельный пакет (ConcurrentBag) проходит через более сложный цикл Parallel ForEach, обрабатывающий документы. –
Мне кажется, что у вас есть список в памяти. Почти всегда быстрее обрабатывать данные по одному потоку, а не параллельно. Это только, если у вас есть тяжелая обработка, что стоит делать что-либо параллельно. – Enigmativity