0

Я пытаюсь обрабатывать большое количество текстовых файлов через Parallel.ForEach, добавляя обработанные данные в BlockingCollection.Можно ли преобразовать while (true) в EventWaitHandle?

Проблема заключается в том, что я хочу, чтобы taskWriteMergedFile taskWriteMergedFile потреблял коллекцию и записывал ее в файл результата по меньшей мере каждые 800000 строк.

Я думаю, что я не могу проверить размер коллекции в рамках итерации, потому что она параллельна, поэтому я создал Task.

Могу ли я преобразовать while (true) loop в задачу в EventWaitHandle в этом случае?

const int MAX_SIZE = 1000000; 
static BlockingCollection<string> mergeData; 
mergeData = new BlockingCollection<string>(new ConcurrentBag<string>(), MAX_SIZE); 


string[] FilePaths = Directory.GetFiles("somepath"); 

var taskWriteMergedFile = new Task(() => 
{ 
    while (true) 
    { 
     if (mergeData.Count > 800000) 
     { 
      String.Join(System.Environment.NewLine, mergeData.GetConsumingEnumerable()); 
      //Write to file 
     } 
     Thread.Sleep(10000); 
    } 
}, TaskCreationOptions.LongRunning); 

taskWriteMergedFile.Start(); 
Parallel.ForEach(FilePaths, FilePath => AddToDataPool(FilePath)); 
mergeData.CompleteAdding(); 

ответ

1

Возможно, вы не хотите этого делать. Вместо этого попросите свою задачу записать каждую строку в файл по мере ее получения. Если вы хотите ограничить размер файла до 80 000 строк, то после записи 80 000-й строки закройте текущий файл и откройте новый.

Подумайте об этом, что у вас не получилось, потому что GetConsumingEnumerable() не остановится, пока коллекция не будет помечена как полная для добавления. Что произойдет, так это то, что вещь будет проходить через цикл сна до тех пор, пока в очереди не будет 80 000 элементов, а затем она будет блокироваться на String.Join, пока основной поток не вызовет CompleteAdding. С достаточным количеством данных вы исчерпали бы память.

Кроме того, если у вас нет веской причины, вы не должны использовать здесь ConcurrentBag. Просто используйте значение по умолчанию для BlockingCollection, которое равно ConcurrentQueue. ConcurrentBag - это структура данных специального назначения, которая не будет работать так же, как и ConcurrentQueue.

Так что ваша задача становится:

var taskWriteMergedFile = new Task(() => 
{ 
    int recordCount = 0; 
    foreach (var line in mergeData.GetConsumingEnumerable()) 
    { 
     outputFile.WriteLine(line); 
     ++recordCount; 
     if (recordCount == 80,000) 
     { 
      // If you want to do something after 80,000 lines, do it here 
      // and then reset the record count 
      recordCount = 0; 
     } 
    } 
}, TaskCreationOptions.LongRunning); 

Это предполагает, конечно, что вы открыли выходной файл где-то в другом месте. Вероятно, лучше открыть выход в начале задания и закрыть его после выхода foreach.

С другой стороны, вы, вероятно, не хотите, чтобы ваш цикл производителя был параллельным. У вас есть:

Parallel.ForEach(FilePaths, FilePath => AddToDataPool(FilePath)); 

Я не знаю точно, что AddToDataPool делает, но если это читает файл и записывает данные в коллекции, есть несколько проблем. Во-первых, дисковод может выполнять только одну вещь за раз, поэтому заканчивается чтение части одного файла, затем части другой, затем части другой и т. Д. Чтобы читать каждый фрагмент следующего файла, он должен ищите голову в правильное положение. Задачи головок дисков невероятно дороги - 5 миллисекунд или более. Вечность в процессорном времени. Если вы не выполняете тяжелую обработку, которая занимает гораздо больше времени, чем чтение файла, вы почти всегда лучше обрабатываете один файл за раз. Если вы не можете гарантировать, что входные файлы находятся на отдельных физических дисках. , ,

Вторая потенциальная проблема заключается в том, что при использовании нескольких потоков вы не можете гарантировать порядок, в котором вещи записываются в коллекцию. Конечно, это не проблема, но если вы ожидаете, что все данные из одного файла будут сгруппированы вместе на выходе, это не произойдет с несколькими потоками, каждый из которых записывает несколько строк в коллекцию.

Просто что-то иметь в виду.