2014-09-11 7 views
2

У меня есть вычисление, которое я распараллеливание с помощью PLINQ следующим образом:связывания источника нити в PLINQ

  • Источник IEnumerable<T> source предоставляет объекты для чтения из файла .

  • У меня есть супертяжелом вычисление HeavyComputation мне нужно сделать на каждый T, и я хочу это откуп между потоками, поэтому я с помощью PLINQ, как: AsParallel().Select(HeavyComputation)

Вот где начинается самое интересное : из-за ограничений на файл тип считывателя, который предоставляет source, мне нужно source быть , перечисленным в начальной нити, а не на параллельных рабочих --- мне нужно полную оценку sourceСвязанные Главным образом нить. Однако кажется, что источник фактически перечисляется на рабочих потоках .

Мой вопрос: Есть ли простой способ изменить этот код связывают перечисление source к исходной теме, в то время как откуп тяжелую работу на параллельных рабочих? Имейте в виду, что просто делает желающий .ToList() до AsParallel() здесь не является вариантом, , поскольку поток данных, поступающий из файла, массивный.

Вот пример кода, который демонстрирует проблемы, как я это вижу:

using System.Threading; 
using System.Collections.Generic; 
using System.Linq; 
using System; 

public class PlinqTest 
{ 
     private static string FormatItems<T>(IEnumerable<T> source) 
     { 
       return String.Format("[{0}]", String.Join(";", source)); 
     } 

     public static void Main() 
     { 
      var expectedThreadIds = new[] { Thread.CurrentThread.ManagedThreadId }; 

      var threadIds = Enumerable.Range(1, 1000) 
        .Select(x => Thread.CurrentThread.ManagedThreadId) // (1) 
        .AsParallel() 
        .WithDegreeOfParallelism(8) 
        .WithExecutionMode(ParallelExecutionMode.ForceParallelism) 
        .AsOrdered() 
        .Select(x => x)         // (2) 
        .ToArray(); 

      // In the computation above, the lambda in (1) is a 
      // stand in for the file-reading operation that we 
      // want to be bound to the main thread, while the 
      // lambda in (2) is a stand-in for the "expensive 
      // computation" that we want to be farmed out to the 
      // parallel worker threads. In fact, (1) is being 
      // executed on all threads, as can be seen from the 
      // output. 

      Console.WriteLine("Expected thread IDs: {0}", 
           FormatItems(expectedThreadIds)); 
      Console.WriteLine("Found thread IDs: {0}", 
           FormatItems(threadIds.Distinct())); 
     } 
} 

Пример выходных я получаю:

Expected thread IDs: [1] 
Found thread IDs: [7;4;8;6;11;5;10;9] 
+0

Вы можете запросить перечислимую сумму (1) и запустить задачи работника в цикле. Ожидайте всех задач в конце цикла. –

+0

Спасибо @Asad, я не уверен, что это позволит мне контролировать степень параллелизма. Не так ли? Возможно, если бы вы могли дать более подробную информацию. –

ответ

1

Это довольно просто (хотя, возможно, не сжат), если вы отказываетесь от PLINQ и просто используете параллельную библиотеку задач:

// Limits the parallelism of the "expensive task" 
var semaphore = new SemaphoreSlim(8); 

var tasks = Enumerable.Range(1, 1000) 
    .Select(x => Thread.CurrentThread.ManagedThreadId) 
    .Select(async x => 
    { 
     await semaphore.WaitAsync(); 
     var result = await Task.Run(() => Tuple.Create(x, Thread.CurrentThread.ManagedThreadId)); 
     semaphore.Release(); 

     return result; 
    }); 

return Task.WhenAll(tasks).Result; 

Обратите внимание, что Я использую Tuple.Create для записи идентификатора потока, исходящего из основного потока, и идентификатора потока, исходящего из заданной задачи. Из моего теста первое всегда одно и то же для каждого кортежа, в то время как последнее варьируется, как и должно быть.

Семафор гарантирует, что степень параллелизма никогда не будет выше 8 (хотя с невысокой задачей создания кортежа это вряд ли так или иначе). Если вы доберетесь до 8, любые новые задачи будут ждать, пока на семафоре появятся пятна.

+0

Это действительно приятно. Мне особенно нравится использование семафора здесь. Я определенно * не * женился на PLINQ ... давайте просто удостовериться, что это работает в моем коде завтра, тогда я приму ответ –

+0

Единственный потенциальный недостаток, который я вижу здесь, заключается в том, что вывод не является потоковым. Не обязательно огромная проблема - это, по сути, проблема с сокращением данных, поэтому выход намного меньше и может храниться в памяти за один раз. Просто любопытно, может ли быть способ восстановить потоковое поведение. Я думаю об этом. –

+0

@DavidAlexander По потоку, вы имеете в виду, что хотите получить результаты по мере их завершения? –