У меня есть настройка блока потока данных производителя/потребителя с использованием BufferBlock и ActionBlock, и он отлично работает в консольном приложении;Как реализовать непрерывно выполняемые блоки потока данных в TPL?
После добавления всех предметов в BurfferBlock и связывания BufferBlock с другими элементами действия; он работает хорошо.
Теперь я хочу использовать эту внутреннюю службу, где этот конвейер блока потока данных всегда будет вверх, и когда сообщения будут доступны через внешние события, он войдет в буферный блок и начнет обработку. Как я могу добиться этого?
До сих пор я сделал ниже:
public void SetupPipeline()
{
FirstBlock = new ActionBlock<WorkItem>(new Action<WorkItem>(ProcessIncomingMessage),
new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded
});
BufferBlock = new BufferBlock<WorkItem>();
GroupingDataflowBlockOptions GroupingDataflowBlockOptions = new GroupingDataflowBlockOptions();
GroupingDataflowBlockOptions.Greedy = true;
GroupingDataflowBlockOptions.BoundedCapacity = GroupingDataflowBlockOptions.Unbounded;
CancellationTokenSource = new CancellationTokenSource();
CancellationToken = CancellationTokenSource.Token;
GroupingDataflowBlockOptions.CancellationToken = CancellationToken;
BatchBlock = new BatchBlock<WorkItem>(BoundingCapacity, GroupingDataflowBlockOptions);
ProcessItems = new ActionBlock<WorkItem[]>(WorkItems =>
ProcessWorkItems(WorkItems.ToList<WorkItem>()),
new ExecutionDataflowBlockOptions
{
CancellationToken = CancellationToken
});
Timer = new Timer(_ =>
BatchBlock.TriggerBatch()
);
TimingBlock = new TransformBlock<WorkItem, WorkItem>(WorkItem =>
{
Timer.Change(TimerInterval, Timeout.Infinite);
logger.Debug("Inside TimingBlock : " + WorkItem.ToString());
return WorkItem;
}, new ExecutionDataflowBlockOptions
{
CancellationToken = CancellationToken
});
BatchBlock.LinkTo(ProcessItems);
TimingBlock.LinkTo(BatchBlock);
BufferBlock.LinkTo(TimingBlock);
}
Почему бы вам просто не сделать это? Что вы пробовали и как это случилось? – svick
@svick Я добавил то, что я реализовал до сих пор – user2757350
Итак, в чем проблема? Работает ли этот код так, как вы ожидаете? Что мешает вам публиковать какие-либо события в этом конвейере? – svick