2012-12-04 8 views
1

У меня проблемы со списком, который передается в широковещательный блок. Вот то, что я до сих пор (псевдо-код, как полный код база слишком долго):Параллельная структура (TDF), представляет собой глубокую копию коллекции, необходимой здесь?

private BroadcastBlock<List<Quote>> tempBCB; 
private TransformBlock<List<Quote>, Dictionary<int, IParentOrder>> tfb1; 
private TransformBlock<List<Quote>, Dictionary<int, IParentOrder>> tfb2; 
private BatchBlock<Dictionary<int, IParentOrder>> batchBlock; 
private JoinBlock<List<Quote>, Dictionary<int, IParentOrder>[]> joinBlock; 
private TransformBlock<Tuple<List<Quote>, 
    Dictionary<int, IParentOrder>[]>,List<MySignal>> transformBlock; 

tempBCB = new BroadcastBlock<List<Quote>>(quoteList => { 
    return quoteList; 
    //return Cloning.CloneListCloneValues<Quote>(quoteList); 
}); 

tfb1 = new TransformBlock<List<Quote>, Dictionary<int, IParentOrder>>(
    quotes => {//do something and return Dictionary<int, IParentOrder>}); 

tfb2 = new TransformBlock<List<Quote>, Dictionary<int, IParentOrder>>(
    quotes => {//do something and return Dictionary<int, IParentOrder>}); 

batchBlock = new BatchBlock<Dictionary<int, IParentOrder>>(2); 

joinBlock = new JoinBlock<List<Quote>, Dictionary<int, IParentOrder>[]>(
    new GroupingDataflowBlockOptions { Greedy = false }); 

transformBlock = new TransformBlock<Tuple<List<Quote>, 
    Dictionary<int, IParentOrder>[]>, List<MySignal>>(
    tuple => { //do something and return List<MySignal>;}); 

//Linking 
tempBCB.LinkTo(tfb1); 
tempBCB.LinkTo(tfb2); 
tfb1.LinkTo(batchBlock); 
tfb2.LinkTo(batchBlock); 
tempBCB.LinkTo(joinBlock.Target1); 
batchBlock.LinkTo(joinBlock.Target2); 
joinBlock.LinkTo(transformBlock); 

Моя проблема заключается в том, что с текущей реализацией tempBCB я получаю странные результаты в конечном TransformBlock<TInput, TOutput>.

Например, Dictionary<int, IParentrOrder> коллекции как часть кортежа не имеют одинакового размера даже реализаций tfb1 и tfb2 на 100% идентичны.

Записанная строка в реализации tempBCB делает глубокую копию широковещательного списка, и это, похоже, решает проблему, но проблема в том, что эта глубокая копия делает мой код примерно в 10 раз медленнее, что на такой величине что мне нужно найти другое решение.

Прежде всего, я не совсем уверен, что это проблема или что это именно так замедляется, что приводит к тому, что параллельные операции выполняются так, как ожидалось, хотя ошибка все еще скрывается там.

Во-вторых, если отсутствие глубокой копии в блоке вещания вызывает эти проблемы, как я могу сделать это быстрее?

Вот мой глубоко скопировать код:

public static List<TValue> CloneListCloneValues<TValue>(List<TValue> original) 
    where TValue : ICloneable 
{ 
    List<TValue> ret = new List<TValue>(original.Count); 

    foreach (TValue entry in original) 
    { 
     ret.Add((TValue)entry.Clone()); 
    } 

    return ret; 
} 

Я мог бы потенциально кормить Quote[] вместо List<Quote> в вещательном блок, но я не вижу, как это помогло бы ускорить производительность глубокой копии.

Мои вопросы:

  • ли глубокая копия выдать реальный вопрос здесь (я сомневаюсь, потому что List<Quote>, повалили в широковещательный блок никогда не изменяется какой-либо из блоков преобразования)?
  • Если да, то почему и как сделать глубокую копию более эффективной?
+0

Можете ли вы создать самодостаточный, полный пример кода, демонстрирующий поведение? Я хочу сказать, что проблема - это копия, и вы не можете обойти ее (но это зависит от поведения, которое проявит ваш пример), но я не могу дублировать поведение с тем, что я читаю выше. – casperOne

+0

Кроме того, почему бы вам не ожидать, что 'tfb1' и' tfb2' не будут возвращать разные результаты? Если у вас есть отдельные блоки преобразования, я бы ожидал, что они будут делать разные вещи. – casperOne

+0

@casperOne вскоре опубликует самодостаточный код. Я ожидаю тех же результатов, потому что блоки генерируют одни и те же данные, но у меня будут инициализированы блоки с тем же Func, чтобы сделать его более ясным. –

ответ

1

Я отвечаю на свой вопрос, потому что в конечном итоге я решил проблему. Вопрос, предупрежденный svick, не был связан с тем, нужна ли List<Quote> глубокая копия в широковещательном блоке или нет (на самом деле это не требовало глубокой копии). Проблема была связана с широковещательным блоком, который был запрошен для завершения (полный набор распространения, равный true для связанных блоков потока данных) перед batchBlock, который также ссылается на joinBlock, потенциально передавая все элементы в joinBlock. Я просто вынул joinBlock, потому что я переписал блоки преобразования (теперь они возвращают свои собственные преобразованные элементы, а также исходный элемент, также делая устаревший элемент joinBlock.

Примечание о параллелизме в главном преобразовании: установка MaxDegreeOfParallelism на> 1 уже обеспечивает преимущества в производительности даже при такой легкой нагрузке, однако, он действительно умирает, когда бросал тяжелые рабочие нагрузки на него

Здесь полный код, который компилируется и работает (я переименовал некоторые классы, но структура остается, как описана выше):.

public class Test 
{ 
    private Stopwatch watch; 

    private BroadcastBlock<List<InputObject>> tempBCB; 
    private BatchBlock<Tuple<List<InputObject>, Dictionary<int, IntermediateObject>>> batchBlock; 
    private TransformBlock<Tuple<List<InputObject>, Dictionary<int, IntermediateObject>>[], List<FinalObject>> transformBlock; 
    private ActionBlock<List<FinalObject>> justToFlushTransformBlock; 

    private CoreLogic core1; 
    private CoreLogic core2; 

    public Test() 
    { 
     tempBCB = new BroadcastBlock<List<InputObject>>(input => input); 

     //here batch size = 2 
     batchBlock = new BatchBlock<Tuple<List<InputObject>,Dictionary<int,IntermediateObject>>>(2, new GroupingDataflowBlockOptions { Greedy = false }); 

     transformBlock = new TransformBlock<Tuple<List<InputObject>,Dictionary<int,IntermediateObject>>[],List<FinalObject>>(array => 
     { 
      List<InputObject> inputObjects = array[0].Item1; 
      List<FinalObject> ret = inputObjects.ConvertAll(x => new FinalObject(x)); 

      foreach (var tuple in array) 
      { 
       //iterate over each individual object 
       foreach (var dictionary in tuple.Item2) 
       { 
        ret[dictionary.Key].outputList.Add(dictionary.Value); 
       } 
      } 

      return ret; 
     }, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded }); 

     justToFlushTransformBlock = new ActionBlock<List<FinalObject>>(list => 
      { 
       //just in order to accept items from the transformBlock output queue 
      }); 

     //Generate 2 CoreLogic objects 
     core1 = new CoreLogic(); 
     core2 = new CoreLogic(); 

     //linking 
     tempBCB.LinkTo(core1.transformBlock, new DataflowLinkOptions { PropagateCompletion = true }); 
     tempBCB.LinkTo(core2.transformBlock, new DataflowLinkOptions { PropagateCompletion = true }); 

     core1.transformBlock.LinkTo(batchBlock); 
     core2.transformBlock.LinkTo(batchBlock); 

     batchBlock.LinkTo(transformBlock, new DataflowLinkOptions { PropagateCompletion = true }); 

     transformBlock.LinkTo(justToFlushTransformBlock, new DataflowLinkOptions { PropagateCompletion = true }); 
    } 

    public void Start() 
    { 
     const int numberChunks = 30; 

     watch = new Stopwatch(); 
     watch.Start(); 

     for (int j = 1; j <= numberChunks; j++) 
     { 
      int collectionSize = 10000 * j; 

      List<InputObject> collection = new List<InputObject>(collectionSize); 
      for (int i = 0; i < collectionSize; i++) 
      { 
       collection.Add(new InputObject(i)); 
      } 

      tempBCB.Post(collection); 
     } 

     tempBCB.Complete(); 

     Task.WhenAll(core1.transformBlock.Completion, core2.transformBlock.Completion).ContinueWith(_ => 
      { 
       batchBlock.Complete(); 
      }); 

     transformBlock.Completion.Wait(); 

     watch.Stop(); 

     Console.WriteLine("Elapsed time (in milliseconds): " + watch.ElapsedMilliseconds); 
     Console.ReadLine(); 
    } 
} 

public class CoreLogic 
{ 
    private Random rand; 
    public TransformBlock<List<InputObject>, Tuple<List<InputObject>, Dictionary<int, IntermediateObject>>> transformBlock; 

    public CoreLogic() 
    { 
     const int numberIntermediateObjects = 10000; 

     transformBlock = new TransformBlock<List<InputObject>, Tuple<List<InputObject>, Dictionary<int, IntermediateObject>>>(input => 
     { 
      //please ignore the fact that `input` is not utilized here, the point is to generate a collection of IntermediateObject and return 

      Dictionary<int, IntermediateObject> ret = new Dictionary<int, IntermediateObject>(); 
      for (int i = 0; i < numberIntermediateObjects; i++) 
      { 
       IntermediateObject value = new IntermediateObject(i); 

       ret.Add(i, value); 
      } 

      var tuple = new Tuple<List<InputObject>, Dictionary<int, IntermediateObject>>(input, ret); 

      return tuple; 
     }); 
    } 
} 

public class InputObject : ICloneable 
{ 
    public int value1 { get; private set; } 

    public InputObject(int value) 
    { 
     this.value1 = value; 
    } 

    object ICloneable.Clone() 
    { 
     return Clone(); 
    } 

    public InputObject Clone() 
    { 
     return (InputObject)this.MemberwiseClone(); 
    } 
} 

public class IntermediateObject 
{ 
    public int value1 { get; private set; } 

    public IntermediateObject(int value) 
    { 
     this.value1 = value; 
    } 
} 

public class FinalObject 
{ 
    public InputObject input { get; private set; } 
    public List<IntermediateObject> outputList; 

    public FinalObject(InputObject input) 
    { 
     this.input = input; 

     this.outputList = new List<IntermediateObject>(); 
    } 
} 

public static class Cloning 
{ 
    public static List<TValue> CloneListCloneValues<TValue>(List<TValue> original) where TValue : ICloneable 
    { 
     List<TValue> ret = new List<TValue>(original.Count); 

     foreach (TValue entry in original) 
     { 
      ret.Add((TValue)entry.Clone()); 
     } 

     return ret; 
    } 
} 

Надеюсь, это поможет другим, кто y борьба с аналогичными проблемами. Мне нравится TPL Dataflow и, в частности, очень помогли и побудили меня копать глубже. Спасибо, свик !!!