2016-12-16 16 views
1

В настоящее время я работаю над потоком данных о трубопроводе, где каждый этап, за исключением этапа 1, является потребителем и производителем async. У меня есть объекты, «текущие» по моему конвейеру, какие ссылочные позиции. На третьем этапе я хотел бы создать цикл и буфера всех объектов, которые отвечают специальному условию (Stage Loop).TPL Dataflow - условные циклы

Если новые объекты входят (этап 3), в то время как есть другие объекты, которые в настоящее время буферизованы (этап цикла), я хотел бы проверить, совпадают ли они в их ссылочном элементе, и если они отправляют их в BufferBlock этапа цикла.

Вопрос в том, как я могу проверить ссылочный элемент всех объектов цикла сцены из этапа 3?

Трубопровод своего рода выглядит следующим образом:

Incoming objects -> 
    BufferBlock1 -> Parsing (Stage2) -> 
    BufferBlock2 -> Processing (Stage3) -> 
    BufferBlock3 -> Stage Loop -> 
    Back to BufferBlock 2 

ответ

0

Вы действительно не нужно, что многие BufferBlock в твоей цепи. TPL Dataflow содержит TransformBlock, который инкапсулирует логику BufferBloсk и ActionBlock и имеет выходной блок для обработанных сообщений.

Что касается петли, вы можете связать блоки между друг с другом with static extension method, так что это может быть, выглядит как

stage2.LinkTo(stage3, CheckForExistingProcessing); 
stage2.LinkTo(stage4); 

Jere stage4 это очереди сообщений, которые не прошли проверку и должны быть обработаны в петле. Вы можете настроить дополнительные ActionBlock, или, может быть, просто используйте TransformBlock для отправки сообщений снова на соответствующий этап. Я думаю, что вы также можете ввести проверку повторных попыток, поскольку некоторые сообщения, вероятно, не могут быть обработаны вообще так несколько причин.

Кроме того, как вы сказали, что у вас есть async логики, вы, вероятно, должны SendAsync сообщения, а не Post их (вы также можете использовать перегрузку с CancellationToken):

// asynchronously wait for a sending with resending attempts 
await stage1.SendAsync(m); 
// asynchronously wait for a sending with resending attempts with possible cancellation 
await stage2.SendAsync(m, token); 

Post метод является синхронным и капли сообщений, если они не принимаются мишенью, сравнивая метод SendAsync, который пытается доставить сообщение, даже если цель не может принять его прямо сейчас.

+0

Я думаю, что я столкнулся с проблемой времени, в которую входит объект1, будет отклонен внутри CheckForExistingProcessing и будет перенаправлен на stage4, в то время как object2 входит и передает Check, потому что элемент ссылки обоих объектов был выпущен из его заблокировать его. Поэтому мне нужно будет проверить, есть ли объекты в stage4, которые ссылаются на один и тот же элемент, прежде чем делать чек. Мне нужна какая-то очередь, которая откладывает объекты на основе проверки блокировки и если уже заблокированные элементы для одного и того же элемента), но сохраняет объект до тех пор, пока блокировка не будет доступна снова. – Peter

+0

Затем вы можете попробовать BlockingCollection на этом шаге. – VMAtm

+0

Мне удалось исключить цикл и блокировку из моего конвейера, и я уже создал для него тестовый класс. Один вопрос, если мне нужно импортировать файлы в том порядке, в котором они входят, и я создал конвейер на основе «одного файла за раз». Как я могу убедиться, что он будет обработан в том порядке, в котором он входит, если я вызову SendAsync? я мог бы добавить BufferBlock перед конвейером, где его потребитель обязательно сохранит порядок, но есть ли более разумный способ? – Peter

 Смежные вопросы

  • Нет связанных вопросов^_^