2017-01-27 9 views
0

Я использую .net реактивное расширение с потоком данных TPL. Вот мой трубопровод:Повторный поток данных tpl с/без Rx

Я получаю данные как поток от какого-то внешнего источника, а затем я преобразую точки данных с помощью потока данных TransformBlocks. После этого я использую буфер Rx для буферизации потоковых точек в течение 1 секунды и, наконец, я использую Actionflow Actionblock для размещения этих буферизованных точек данных в конечной точке REST.

Я хотел бы повторить операцию пост-пост REST на переходных ошибках. Где следует повторить попытку:

  1. После буфера?
  2. Внутренний блок действий?
  3. Что произойдет с непрерывной потоковой передачей во время повторной попытки? Я не хочу пропускать какие-либо данные.

ответ

1

Просто уходящий от обзора высокого уровня, я думаю, было бы проще повторить в финале ActionBlock. Ваш ActionBlock отправит и подтвердит успех до окончания. В зависимости от объема данных этот подход позволяет вам закрутить столько ActionBlock s, сколько необходимо, без излишней заботы о потерянных элементах, на самом деле их не должно быть. Если один или несколько экземпляров ActionBlock не удалили свой пост, ваши объекты будут по-прежнему передаваться и распространяться в отношении того, как вы настроили свои буферы и очереди ввода, ожидая их шанса на проводе.

Редактировать Просто некоторые псевдо-код, но это будет иметь партию точек данных, то IEnumerable, и стараться размещать их в пять раз. Ограниченная емкость заставит каждый экземпляр обработчика поставить в очередь 1000 партий, а параллелизм будет распределять партии между блоками действий. Необязательно, Buffer может быть добавлен до ActionBlock для хранения всех входящих партий. Вам нужно будет позаботиться о том, чтобы ваш продюсер, поток, не сильно запустил ваш потребитель, службу REST.

public void ConfigureFinalActionBlock() { 
     var dataPointBuffer = new BufferBlock<IEnumerable<Datapoint>>(new DataflowBlockOptions() { 
      BoundedCapacity = DataflowBlockOptions.Unbounded 
     }); 

     var options = new ExecutionDataflowBlockOptions() { 
      BoundedCapacity = 1000, 
      MaxDegreeOfParallelism = Environment.ProcessorCount 
     }; 
     var restBlock = new ActionBlock<IEnumerable<Datapoint>>(async (data) => { 
      var success = false; 
      var attempts = 0; 
      while (!success && attempts < 5) { 
       attempts++; 
       success = await MyApiPostAsync(data); 
      } 
     }, options); 

     dataPointBuffer.LinkTo(restBlock, new DataflowLinkOptions() { 
      PropagateCompletion = true 
     }); 
+0

Возьмем этот случай. Я получил буфер 100000 datapoints, и я заново его запускаю в actionblock. Только в следующую секунду я получу еще 100 000 данных с запросом на отправку actionblock. Поэтому я могу пропустить некоторые моменты. Как установить несколько блоков действий с входными очередями. – Abhay

+0

Как вы можете пропустить какие-либо баллы? Буфер имеет неограниченную очередь, новые сообщения будут либо ждать обработанных, либо, если вы укажете максимальную степень параллелизма, будут выполняться одновременно в другом потоке. – VMAtm