2016-12-02 6 views
1

Я новичок в TPL Dataflow и я ищу конструкцию, которая позволит разбить список исходных сообщений для равномерно распределенных параллельной обработки при сохранении порядка сообщений сообщений через отдельные конвейеры. Существует ли конкретный блок или концепция в API данных DataFlow, который может быть использован для достижения этого, или это более важный вопрос, обеспечивающий код клея или пользовательские блоки между существующими блоками?Как маршрутизировать, группировать или иным образом разделить сообщения на согласованные наборы с использованием потока данных TPL

Для тех, кто знаком с Akka.NET Я ищу функциональность, похожую на ConsistentHashing router, которая позволяет отправлять сообщения одному маршрутизатору, который затем пересылает эти сообщения на отдельные маршруты, которые нужно обрабатывать.

Синхронный пример:

var count = 100000; 
var processingGroups = 5; 
var source = Enumerable.Range(1, count); 

// Distribute source elements consistently and evenly into a specified set of groups (ex. 5) so that. 
var distributed = source.GroupBy(s => s % processingGroups); 

// Within each of the 5 processing groups go through each item and add 1 to it 
var transformed = distributed.Select(d => d.Select(i => i + 3).ToArray()); 

List<int[]> result = transformed.ToList(); 
Check.That(result.Count).IsEqualTo(processingGroups); 
for (int i = 0; i < result.Count; i++) 
{ 
    var outputGroup = result[i]; 

    var expectedRange = Enumerable.Range(i + 1, count/processingGroups).Select((e, index) => e + (index * (processingGroups - 1)) + 3); 
    Check.That(outputGroup).ContainsExactly(expectedRange); 
} 

ответ

1

В общем, я не думаю, что вы ищете, готовые в DataFlow, как это может быть с ConsistentHashing маршрутизатором. Однако, добавив идентификатор к части данных, которые вы хотите передать, вы можете обрабатывать их в любом порядке параллельно и переупорядочивать их при завершении обработки.

public class Message { 
     public int MessageId { get; set; } 
     public int GroupId { get; set; }   
     public int Value { get; set; } 
    } 

    public class MessageProcessing 
    { 
     public void abc() { 
      var count = 10000; 
      var groups = 5; 
      var source = Enumerable.Range(0, count); 

      //buffer all input 
      var buffer = new BufferBlock<IEnumerable<int>>(); 

      //split each input enumerable into processing groups 
      var messsageProducer = new TransformManyBlock<IEnumerable<int>, Message>(ints => 
      ints.Select((i, index) => new Message() { MessageId = index, GroupId = index % groups, Value = i }).ToList()); 

      //process each message, one action block may process any group id in any order 
      var processMessage = new TransformBlock<Message, Message>(msg => 
      { 
       msg.Value++; 
       return msg; 
      }, new ExecutionDataflowBlockOptions() { 
       MaxDegreeOfParallelism = groups 
      }); 

      //output of processed message values 
      int[] output = new int[count]; 

      //insert messages into array in the order the started in 
      var regroup = new ActionBlock<Message>(msg => output[msg.MessageId] = msg.Value, 
       new ExecutionDataflowBlockOptions() { 
        MaxDegreeOfParallelism = 1 
       }); 
     }   

    } 

В примере GroupID из сообщения не используется, но он может быть использован в более полном примере для координации группы сообщений. Кроме того, обработка последующих сообщений в буферный блок может быть выполнена путем изменения выходного массива в список и настройки соответствующего элемента списка каждый раз, когда перечислимый из целых чисел отправляется в буферный блок. В зависимости от вашего точного использования вам может потребоваться поддержка нескольких пользователей на выходе, и это можно отложить обратно в поток.

0

Вы можете динамически создать конвейер с linking the blocks between each other основанным на предикате:

var count = 100; 
var processingGroups = 5; 
var source = Enumerable.Range(1, count); 

var buffer = new BufferBlock<int>(); 
var consumer1 = new ActionBlock<int>(i => { }); 
var consumer2 = new ActionBlock<int>(i => { }); 
var consumer3 = new ActionBlock<int>(i => { }); 
var consumer4 = new ActionBlock<int>(i => { Console.WriteLine(i); }); 
var consumer5 = new ActionBlock<int>(i => { }); 

buffer.LinkTo(consumer1, i => i % 5 == 1); 
buffer.LinkTo(consumer2, i => i % 5 == 2); 
buffer.LinkTo(consumer3, i => i % 5 == 3); 
buffer.LinkTo(consumer4, i => i % 5 == 4); 
buffer.LinkTo(consumer5); 

foreach (var i in source) 
{ 
    buffer.Post(i); 
    // consider async option if you able to do it 
    // await buffer.SendAsync(i); 
} 
buffer.Complete(); 
Console.ReadLine(); 

Код выше будет писать только цифры от 4 группы, обрабатывая другие группы молча, но я надеюсь, что вы получили эту идею. Существует общая практика связывания блока по крайней мере одного потребителя без фильтрации сообщений, которые не удаляются, если они не принимаются ни одним из потребителей, и вы можете сделать это, если у вас нет обработчика по умолчанию (NullTarget<int> просто игнорирует все сообщения он получил):

buffer.LinkTo(DataflowBlock.NullTarget<int>()); 

недостаток этого является продолжением своих преимуществ: вы должны предоставить предикаты, так как нет никаких встроенных структур для этого. Однако это все еще можно сделать.