2014-10-28 1 views
13

У меня есть код, как это:Как запустить LINQ 'let' в параллель?

var list = new List<int> {1, 2, 3, 4, 5}; 

var result = from x in list.AsParallel() 
      let a = LongRunningCalc1(x) 
      let b = LongRunningCalc2(x) 
      select new {a, b}; 

Скажем, LongRunningCalc методы принимают на 1 секунду. Приведенный выше код занимает около 2 секунд для запуска, потому что, когда список из 5 элементов работает параллельно, два метода, вызываемые из операторов let, называются последовательно.

Однако эти методы можно смело вызывать параллельно. Им, очевидно, нужно объединиться для select, но до этого следует работать параллельно - select должен их ждать.

Есть ли способ достичь этого?

ответ

7

Вы не сможете использовать синтаксис запроса или операции let, но вы можете написать метод для выполнения нескольких операций для каждого элемента параллельно:

public static ParallelQuery<TFinal> SelectAll<T, TResult1, TResult2, TFinal>(
    this ParallelQuery<T> query, 
    Func<T, TResult1> selector1, 
    Func<T, TResult2> selector2, 
    Func<TResult1, TResult2, TFinal> resultAggregator) 
{ 
    return query.Select(item => 
    { 
     var result1 = Task.Run(() => selector1(item)); 
     var result2 = Task.Run(() => selector2(item)); 
     return resultAggregator(result1.Result, result2.Result); 
    }); 
} 

Это позволит вам писать:

var query = list.AsParallel() 
    .SelectAll(LongRunningCalc1, 
     LongRunningCalc2, 
     (a, b) => new {a, b}) 

Вы можете добавить перегрузки для дополнительных параллельных операций, а также:

public static ParallelQuery<TFinal> SelectAll<T, TResult1, TResult2, TResult3, TFinal> 
    (this ParallelQuery<T> query, 
    Func<T, TResult1> selector1, 
    Func<T, TResult2> selector2, 
    Func<T, TResult3> selector3, 
    Func<TResult1, TResult2, TResult3, TFinal> resultAggregator) 
{ 
    return query.Select(item => 
    { 
     var result1 = Task.Run(() => selector1(item)); 
     var result2 = Task.Run(() => selector2(item)); 
     var result3 = Task.Run(() => selector3(item)); 
     return resultAggregator(
      result1.Result, 
      result2.Result, 
      result3.Result); 
    }); 
} 

Можно написать версию, чтобы обрабатывать несколько селекторов не известен во время компиляции, но сделать это все, что нужно для вычисления значения одного и того же типа:

public static ParallelQuery<IEnumerable<TResult>> SelectAll<T, TResult>(
    this ParallelQuery<T> query, 
    IEnumerable<Func<T, TResult>> selectors) 
{ 
    return query.Select(item => selectors.AsParallel() 
      .Select(selector => selector(item)) 
      .AsEnumerable()); 
} 
public static ParallelQuery<IEnumerable<TResult>> SelectAll<T, TResult>(
    this ParallelQuery<T> query, 
    params Func<T, TResult>[] selectors) 
{ 
    return SelectAll(query, selectors); 
} 
+0

Это не моя экспертная область, но вам не нужно ждать результатов задач? – Magnus

+0

@ Magnus Я уже есть. – Servy

+0

Yeh, вызывающий '.Result', ждет выполнение задачи. Я немного упростил свой пример кода, но это поставило меня на правильный путь, спасибо за помощь. – Lyall

0

Я бы сделать это с помощью Microsoft, реактивного Framework («Rx-Main» в NuGet).

Здесь:

var result = 
    from x in list.ToObservable() 
    from a in Observable.Start(() => LongRunningCalc1(x)) 
    from b in Observable.Start(() => LongRunningCalc2(x)) 
    select new {a, b}; 

Хорошая вещь, что вы можете получить доступ к результатам, поскольку они производятся с использованием метода .Subscribe(...):

result.Subscribe(x => /* Do something with x.a and/or x.b */); 

Супер просто!