2013-12-02 2 views
4

У меня есть настройка блока потока данных производителя/потребителя с использованием BufferBlock и ActionBlock, и он отлично работает в консольном приложении;Как реализовать непрерывно выполняемые блоки потока данных в TPL?

После добавления всех предметов в BurfferBlock и связывания BufferBlock с другими элементами действия; он работает хорошо.

Теперь я хочу использовать эту внутреннюю службу, где этот конвейер блока потока данных всегда будет вверх, и когда сообщения будут доступны через внешние события, он войдет в буферный блок и начнет обработку. Как я могу добиться этого?

До сих пор я сделал ниже:

public void SetupPipeline() 
{ 

     FirstBlock = new ActionBlock<WorkItem>(new Action<WorkItem>(ProcessIncomingMessage), 
     new ExecutionDataflowBlockOptions 
     { 
      MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded 
     }); 

     BufferBlock = new BufferBlock<WorkItem>(); 

     GroupingDataflowBlockOptions GroupingDataflowBlockOptions = new GroupingDataflowBlockOptions(); 
     GroupingDataflowBlockOptions.Greedy = true; 
     GroupingDataflowBlockOptions.BoundedCapacity = GroupingDataflowBlockOptions.Unbounded; 
     CancellationTokenSource = new CancellationTokenSource(); 
     CancellationToken = CancellationTokenSource.Token; 
     GroupingDataflowBlockOptions.CancellationToken = CancellationToken; 
     BatchBlock = new BatchBlock<WorkItem>(BoundingCapacity, GroupingDataflowBlockOptions); 

     ProcessItems = new ActionBlock<WorkItem[]>(WorkItems => 
      ProcessWorkItems(WorkItems.ToList<WorkItem>()), 
      new ExecutionDataflowBlockOptions 
      { 
       CancellationToken = CancellationToken 
      }); 

     Timer = new Timer(_ => 
       BatchBlock.TriggerBatch() 
      ); 

     TimingBlock = new TransformBlock<WorkItem, WorkItem>(WorkItem => 
     { 
      Timer.Change(TimerInterval, Timeout.Infinite); 
      logger.Debug("Inside TimingBlock : " + WorkItem.ToString()); 
      return WorkItem; 
     }, new ExecutionDataflowBlockOptions 
     { 
      CancellationToken = CancellationToken 
     }); 

     BatchBlock.LinkTo(ProcessItems); 
     TimingBlock.LinkTo(BatchBlock); 
     BufferBlock.LinkTo(TimingBlock); 
} 
+1

Почему бы вам просто не сделать это? Что вы пробовали и как это случилось? – svick

+0

@svick Я добавил то, что я реализовал до сих пор – user2757350

+0

Итак, в чем проблема? Работает ли этот код так, как вы ожидаете? Что мешает вам публиковать какие-либо события в этом конвейере? – svick

ответ

1

Размер партии определяется переменной «BoundingCapacity» в конструкторе batchblock. Партия будет размещена в следующих случаях:

  • ряд сообщений, равных размера партии были получены (определены конструктором)
  • Пакетный блок помечается для завершения
  • Метод triggerbatch называется

Кажется, что вы хотите, чтобы партия отправлялась после того, как был достигнут размер ванны или произошел тайм-аут. Если это так, и если размер партии не является критическим, я бы просто добавил повторяющийся интервал к таймеру, который у вас есть, и сделайте объект, расположенный ниже по потоку от блока batchblock, игнорировать пустые записи.

Что вы можете на самом деле хотеть, а что больше всего связано с философией программирования потока данных, заключается в создании нового пакетного блока, когда вы начинаете публикацию ряда элементов, а затем завершаете его, когда это делается или когда происходит тайм-аут.Новые сообщения будут создавать новый пакетный блок, если он еще не существует.

Проблема с попыткой реализовать таймер тайм-аута вокруг пакетного блока, который срабатывает только с первого триггера, заключается в том, что вам нужно будет подсчитывать и проверять сообщения в буферном блоке, или вам нужно будет просматривать сообщения из буфера. Оба этих сценария создадут много уродства и/или нарушают инкапсуляцию блоков.

+1

Спасибо за ваши предложения; Я хочу избегать создания пакетного блока каждый раз. Как вы видите, моя программа в основном преобразует частые сообщения в кусок, используя буфер. Буфер отлично работает с BoundingCapacity; в моем случае я установил его на 100. Но я не хочу ждать, пока не поступит все 100 сообщений. Я хочу двойной контроль над BufferBlock; например если есть 100 сообщений или 5 секунд (оба могут быть настроены). Решения работают в соответствии с моей потребностью, но я хотел бы посмотреть, есть ли у кого-то еще лучшие решения. Они здесь, я хочу, чтобы BufferBlock вел себя в двойном режиме; при достижении BoundinCapacity и времени ожидания. – user2757350

+0

Как только вы передаете элемент в блок потока данных, вам необходимо не знать об этом элементе. Предполагается, что блоки потока данных управляются данными. Внешнее управление неодобрительно. Я бы честно изменил то, что у вас есть, чтобы «тайм-аут» и пакетный блок были заключены в один IPropagatingBlock. – VoteCoffee

+1

@VeteCoffee - это имеет смысл; это будет разделять блоки, и я могу заменить что-то еще в будущем ... Спасибо – user2757350

0

В упрощении, DataFlow является способом обработки кучи объектов, используя набор методов. Он не предусматривает и не ожидает какого-либо конкретного способа создания этих объектов.

Если вы хотите, чтобы трубопровод оставался в живых, просто не завершайте приложение. Если вы не хотите использовать консольное приложение, создайте службу, которая строит конвейер и отправляет ему объекты до его закрытия.

Сообщения - это просто объекты, которые вы создадите, читая данные, в ответ на события (независимо от того, что это означает) или любым другим способом.

Что касается внешних событий, что вы подразумеваете под этим? Что кто-то отправит данные в ваше приложение? Есть много способов это может произойти:

  • Если данные поступают из другого консольного приложения, вы можете передать результаты одного приложения к другому, анализировать данные, поступающие из входного потока приложения командной строки, создать сообщений и передать их в конвейер
  • Если вы хотите прослушивать запросы для служб, вы можете разместить службу .NET Pipe, WCF или Web API для прослушивания вызовов и передачи опубликованных данных в конвейер.
  • Если данные поступают из базы данных, вы можете опросить изменения и отправить любые измененные данные в конвейер.

Дело в том, что Dataflow предназначен для обработки данных, а не для прослушивания событий. Это не полная система распределенных агентов, если это то, что вы искали.

+0

Внешние события, т. е. у меня есть обновления уведомлений, поступающие с разных каналов; который я отправляю на BufferBlock. У меня есть обслуживание 24/7 и запуск, который прослушивает эти входящие обновления местоположения и основывается на том, что делает некоторую обработку потокового потока ... – user2757350