2014-01-16 3 views
13

Я хочу выполнить запрос по потоку данных при обработке элементов параллельно с определенной степенью параллелизма. Обычно я использую PLINQ для этого, но мои рабочие элементы не связаны с ЦП, а связаны с IO. Я хочу использовать async IO. PLINQ не поддерживает работу async.Есть ли асинхронная версия PLINQ?

Каков самый умный способ запуска запроса в стиле PLINQ, но с асинхронными рабочими элементами?


Вот более подробные иллюстрации проблемы:

Моя цель состоит в том, чтобы обрабатывать потенциально бесконечный поток «элементов» в пути, который логически описывается следующим запросом:

var items = new int[10]; //simulate data 

var results = 
from x in items.AsParallel().WithDegreeOfParallelism(100) 
where Predicate(x) 
select ComputeSomeValue(x); 

foreach (var result in results) 
PerformSomeAction(result); 

Этот запрос представляет собой всего лишь эскиз реального запроса. Теперь я хочу, чтобы каждая из функций-заполнителей была асинхронной (возвращает Task и внутренне базируется на async IO).

Обратите внимание, что в памяти может быть гораздо больше предметов, чем может быть сохранено в памяти. Я также должен контролировать степень параллелизма, чтобы максимизировать базовую сеть и дисковое оборудование.

Этот вопрос не касается многоядерных процессоров. Он полностью применим к машинам с только одним ядром ЦП, поскольку IO все еще может выиграть от параллелизма. Подумайте о медленных вызовах веб-сервиса и тому подобных.

+2

+1, отличный вопрос. Интересно, можете ли вы использовать порты завершения на стороне ввода-вывода для достижения параллелизма? Отказ от ответственности: я использовал их много на C++, но никогда не C#. –

+2

Вы сказали, что «рабочие элементы не связаны с ЦП, а связаны с IO». Следовательно, большое количество ядер и параллелизм CPU не будут иметь слишком большой поддержки. Я имею в виду, что если использование ЦП низкое, а использование I/O является высоким для этих двух операций, тогда создайте n = 10 прикованных действий (ComputeSomeValue, продолжение по PerformSomeAction) и начните цепочку последовательно. * Новая задача (ComputeSomeValue) .ContinueWith (...) * и т. Д. –

+0

Вы можете применить этот фильтр, который у вас есть с PLINQ (параллелизм процессора), но это все ... Вам нужно запускать задачи для части ввода-вывода. . ИМХО. –

ответ

2

Как указано here, PLINQ для запуска LINQ запросов параллельно на многоядерных/многопроцессорных системах. Это не слишком много касается прохладных систем с большим количеством дисковых блоков и супер сетевых возможностей. AFAIK, он предназначен для запуска исполняемого кода на большее количество ядер, а не для одновременной отправки нескольких запросов ввода-вывода в операционную систему.

Возможно, ваш Предикат (x) связан с ЦП, поэтому вы можете выполнить эту операцию фильтрации с помощью PLINQ. Но вы не можете применять операции ввода-вывода (ComputeSomeValue, PerformSomeAction) таким же образом.

Что вы можете сделать, так это определить цепочку операций (по два в вашем случае) для каждого элемента (см. continuation tasks) и отправить эту цепочку (последовательно (?)).

Также вы упомянули что-то о «Бесконечный поток предметов». Это может звучать как проблема производителя-потребителя - если эти элементы также сгенерированы ввода-вывода.

Может быть, ваша проблема не в том многоядерные дружественной ... Это может быть только I/O требовательны, вот и все ...

+2

Загрузка процессора не имеет значения. Но я требую async IO и очень высокую степень параллелизма, чтобы максимизировать IO. Вопрос полностью применим к машинам с одним ядром ЦП, поскольку доработки ввода-вывода будут мультиплексированы на одно ядро; Я мог бы определить «цепочку операторов», но слишком много элементов, чтобы сразу запустить всю работу. Мне нужен дросселированный/гарантированный уровень параллелизма. – usr

+0

@usr, который звучит точно так же, как 'TPL Dataflow' – i3arnon

5

Это походит на работу реактивной рамках Microsoft.

Я начал с этим кодом в качестве моих исходных переменных:

var items = Enumerable.Range(0, 10).ToArray(); 

Func<int, bool> Predicate = x => x % 2 == 0; 

Func<int, int> ComputeSomeValue = x => 
{ 
    Thread.Sleep(10000); 
    return x * 3; 
}; 

Теперь я использовал регулярный запрос LINQ в качестве базовой линии:

var results = 
    from x in items 
    where Predicate(x) 
    select ComputeSomeValue(x); 

Это заняло 50 секунд, чтобы вычислить следующие результаты :

enumerable

Затем я перешел к набл ervable (реактивная основа) запроса:

var results = 
    from x in items.ToObservable() 
    where Predicate(x) 
    from y in Observable.Start(() => ComputeSomeValue(x)) 
    select y; 

Это заняло 10 секунд, чтобы получить:

observable

Это явно вычисляя параллельно.

Однако результаты не соответствуют действительности. Поэтому я изменил запрос к этому:

var query = 
    from x in items.ToObservable() 
    where Predicate(x) 
    from y in Observable.Start(() => ComputeSomeValue(x)) 
    select new { x, y }; 

var results = 
    query 
     .ToEnumerable() 
     .OrderBy(z => z.x) 
     .Select(z => z.y); 

Это все еще потребовалось 10 секунд, но я получил результат обратно в правильном порядке.

Теперь единственным вопросом является WithDegreeOfParallelism. Здесь есть купе вещей.

Сначала я изменил код, чтобы произвести 10 000 значений с 10 мс. Мой стандартный запрос LINQ продолжался 50 секунд. Но реактивный запрос занял 6,3 секунды. Если бы он мог выполнять все вычисления в одно и то же время, он должен был бы принимать гораздо меньше. Это показывает, что он максимизирует асинхронный конвейер.

Второй момент заключается в том, что реактивная каркас использует планировщики для всего планирования работы. Вы можете попробовать множество планировщиков, которые поставляются с реактивной каркасной платформой, чтобы найти альтернативу, если встроенный не сделает то, что вы хотите. Или вы даже можете написать свой собственный планировщик, чтобы делать все, что вам нравится.


Вот версия запроса, которая также вычисляет предикат.

var results = 
    from x in items.ToObservable() 
    from p in Observable.Start(() => Predicate(x)) 
    where p 
    from y in Observable.Start(() => ComputeSomeValue(x)) 
    select new { x, y }; 
+0

Я раньше не использовал Rx. Может ли предикат и ComputeSomeValue быть полностью асинхронным (возвращая задачу)? В вашем примере это будет Task.Delay. – usr

+0

Вы можете конвертировать наблюдаемые задачи и задачи в наблюдаемые, используя реактивную среду. Но по моему опыту реактивная каркас дает гораздо более сжатый код, чем TPL. Я предлагаю делать ваши вычисления исключительно в реактивной среде. – Enigmativity

+0

@usr - вы также можете сделать вызов «Predicate» наблюдаемым вызовом, заставляя его работать параллельно. Реактивная среда делает хорошую работу по смешению планирования всех частей запроса, чтобы он выполнялся эффективно. – Enigmativity

 Смежные вопросы

  • Нет связанных вопросов^_^