1

Мне нужно обработать List<T> из тысяч элементов.C# Split List <T> в группы с использованием TPL Parallel ForEach

Прежде всего, мне нужно сгруппировать элементы по годам и типам, поэтому я получаю List<List<T>>. Затем для каждого внутреннего List<T> я хочу добавить объекты типа T до тех пор, пока не будет достигнут максимальный размер пакета для List<T>, тогда я создам новый пакет и продолжаю то же самое.

Я хочу использовать Parallel.ForEach петлю.

Моя фактическая реализация работает хорошо, если я запускаю ее последовательно, но логика не является потоковой безопасностью, и я хочу ее изменить.
Я думаю, что проблема заключается в внутреннем цикле Parallel.ForEach, когда достигнут максимальный размер для List<T>, и я создаю новый List<T> внутри той же ссылки.

private ConcurrentBag<ConcurrentBag<DumpDocument>> InitializePackages() 
{ 
    // Group by Type and Year 
    ConcurrentBag<ConcurrentBag<DumpDocument>> groups = new ConcurrentBag<ConcurrentBag<DumpDocument>>(Dump.DumpDocuments.GroupBy(d => new { d.Type, d.Year }) 
     .Select(g => new ConcurrentBag<DumpDocument> (g.ToList())) 
     .ToList()); 

    // Documents lists with max package dimension 
    ConcurrentBag<ConcurrentBag<DumpDocument>> documentGroups = new ConcurrentBag<ConcurrentBag<DumpDocument>>(); 

    foreach (ConcurrentBag<DumpDocument> group in groups) 
    {  
     long currentPackageSize = 0; 

     ConcurrentBag<DumpDocument> documentGroup = new ConcurrentBag<DumpDocument>(); 

     ParallelOptions options = new ParallelOptions { MaxDegreeOfParallelism = Parameters.MaxDegreeOfParallelism }; 
     Parallel.ForEach(group, options, new Action<DumpDocument, ParallelLoopState>((DumpDocument document, ParallelLoopState state) => 
      { 
       long currentDocumentSize = new FileInfo(document.FilePath).Length; 

       // If MaxPackageSize = 0 then no splitting to apply and the process works well 
       if (Parameters.MaxPackageSize > 0 && currentPackageSize + currentDocumentSize > Parameters.MaxPackageSize) 
       { 
        documentGroups.Add(documentGroup); 

        // Here's the problem! 
        documentGroup = new ConcurrentBag<DumpDocument>(); 

        currentPackageSize = 0; 
       } 

       documentGroup.Add(document); 
       currentPackageSize += currentDocumentSize; 
      })); 

     if (documentGroup.Count > 0) 
      documentGroups.Add(documentGroup); 
    } 

    return documentGroups; 
} 

public class DumpDocument 
{ 
    public string Id { get; set; } 
    public long Type { get; set; } 
    public string MimeType { get; set; } 
    public int Year { get; set; } 
    public string FilePath { get; set; } 
} 

Поскольку моя операция довольно проста, на самом деле мне нужно только, чтобы получить размер файла с помощью:

long currentDocumentSize = new FileInfo(document.FilePath).Length; 

Я читал вокруг, что я могу также использовать Partitioner, но я никогда не использовал, что и в любом случае это не мой приоритет на данный момент.

Я тоже уже читал это question, что похоже, но не решает мою проблему с внутренним циклом.

UPDATE 28/12/2016

Я обновил код, чтобы соответствовать требованиям проверки.

+0

Вы просто пытаетесь ускорить процесс параллелизма? – Enigmativity

+0

В этом конкретном случае да. Я хочу ускорить инициализацию пакетов. Затем каждый отдельный пакет (ConcurrentBag ) проходит через более сложный цикл Parallel ForEach, обрабатывающий документы. –

+0

Мне кажется, что у вас есть список в памяти. Почти всегда быстрее обрабатывать данные по одному потоку, а не параллельно. Это только, если у вас есть тяжелая обработка, что стоит делать что-либо параллельно. – Enigmativity

ответ

1

После обновления кода, кажется, что you'are с помощью ConcurrentBag так это еще один не-поточно-логика остается в вашем коде:

long currentPackageSize = 0; 
if (// .. && 
    currentPackageSize + currentDocumentSize > Parameters.MaxPackageSize 
// ... 
{ 
    // ... 
    currentPackageSize += currentDocumentSize; 
} 

+= оператор не является атомарным, и вы будете определенно есть условие гонки там, и чтение значения переменной long не является потокобезопасным здесь. Вы можете ввести locks там, или использовать Interlocked class атомарно обновляет значение:

Interlocked.Add(ref currentPackageSize, currentDocumentSize); 
Interlocked.Exchange(ref currentPackageSize, 0); 
Interlocked.Read(ref currentPackageSize); 

Используя этот класс приведет к коду некоторые рефакторинга (я думаю, что использование CAS операций, таких как CompareExchange является более предпочтительным ваш случай), поэтому, возможно, для вас это самый простой способ использования замков. Вероятно, вы должны реализовать оба способа, проверить их и измерить время выполнения).

Кроме того, как вы можете видеть, экземпляр также не является потокобезопасным, поэтому вам пришлось либо заблокировать переменную (что приведет к паузе синхронизации потоков), либо реорганизовать ваш код на два шага: сначала вы получить все размеры файлов параллельно, после чего вы последовательно повторяете результаты, избегая условий гонки.

Что касается Partitioner, вы не должны использовать этот класс здесь, поскольку он обычно используется для планирования работы по процессору, а не для разделения результатов.

Однако я хотел бы отметить, что у вас есть некоторые незначительные проблемы кода:

  1. Вы можете удалить ToList() звонков внутри строителей ConcurrentBag, потому что он принимает IEnumerable, который у вас уже есть:

    ConcurrentBag<ConcurrentBag<DumpDocument>> groups = new ConcurrentBag<ConcurrentBag<DumpDocument>>(Dump.DumpDocuments.GroupBy(d => new { d.Type, d.Year }) 
        .Select(g => new ConcurrentBag<DumpDocument> (g))); 
    

    Это поможет вам избежать ненужных копий ваших сгруппированных данных

  2. Вы можете использовать var ключевое слово, чтобы избежать дублирования типов в коде (это просто пример строки, вы можете изменить его много раз через код):

    foreach (var group in groups) 
    
  3. Вы не должны использовать максимальную степень параллельности, если вы не зная, что вы делаете (и я думаю, что вы не):

    var options = new ParallelOptions { MaxDegreeOfParallelism = Parameters.MaxDegreeOfParallelism }; 
    

    TPL по умолчанию планировщик задач пытается настроить пул потоков и использование процессора для ваших задач, так и в целом это число должно быть равно Environment.ProcessorCount.

  4. Вы можете использовать lambda синтаксис для Parallel.ForEach, и не создают новый Action (вы также можете переместить этот код в процедуру метода):

    (document, state) => 
    { 
        long currentDocumentSize = new FileInfo(document.FilePath).Length; 
    
        // If MaxPackageSize = 0 then no splitting to apply and the process works well 
        if (Parameters.MaxPackageSize > 0 && currentPackageSize + currentDocumentSize > Parameters.MaxPackageSize) 
        { 
         documentGroups.Add(documentGroup); 
    
         // Here's the problem! 
         documentGroup = new ConcurrentBag<DumpDocument>(); 
    
         currentPackageSize = 0; 
        } 
    
        documentGroup.Add(document); 
        currentPackageSize += currentDocumentSize; 
    } 
    

    Лямбда правильно составлен, потому что вы уже имеют общую коллекцию (сумку), и есть перегрузка, которая принимает ParallelLoopState в качестве второго параметра.

+1

Спасибо! Я, наконец, реорганизовал код в 2 этапа. Но я также оценил ваш экскурс в классе Interlocked. Я использовал его на некоторых других блоках ParallelForEach в моем коде, особенно для счетчиков. –

+0

Добро пожаловать :) Да, для счетчиков такие приращения очень полезны. Удачи вам в ваших проектах. – VMAtm

 Смежные вопросы

  • Нет связанных вопросов^_^