2013-03-06 2 views
6

Я работаю над проектом со следующим рабочим процессом:Дождитесь блокировки коллекции (очереди) для уменьшения размера в C#

Часть первая:

  • Событие прибывает асинхронно и очередями в блокировании очередь, мы будем называть это Q1
  • Thread подхватывает следующий доступный элемент из этой очереди
  • Пункт заканчивается работает {N} ряд задач параллельно
  • Каждое задание ставит в очередь его результат во второй очереди, мы будем называть это Q2.
  • При обработке финишных элементов следующий элемент считывается из очереди.

Часть вторая:

  • Другой поток считывает из Q2 один объект в то время, и работает на результат

Итак, проблема здесь есть, каждый элемент в первой очереди заканчивается выполнение большого количества задач параллельно, и каждая задача ставит в очередь свой результат. Вторая очередь должна обрабатываться поочередно, по одному пункту за раз, и она наводняется.


Мой вопрос

мне нужен механизм, который сделает обработку резьбы Q1 ждать, пока количество элементов в Q2 не ниже определенного порогу. Каков наилучший способ достичь этого? Есть ли способ иметь решение, основанное на событиях, а не решение опроса?

ответ

7

Вместо использования Queue<T> вы можете использовать BlockingCollection<T> для Q2. Если вы установите BoundedCapacity, вызовы Q2.Add() будут блокироваться при достижении емкости. Это автоматически закроет обработку Q1, так как задачи N начнут блокироваться, если они не смогут добавить в конечную очередь.

+0

Cool; Я уже использую блокирующие коллекции здесь, так что это незначительные изменения :) –

+0

Я попробую и приму, если это сработает. Это могло бы быть хорошим решением, так как это прекратило бы обработку потоков задач (N), пока они не смогли бы поставить очередь на результат, что остановило бы больше событий от обработки в первую очередь :) –

2

Я предполагаю, что вы получаете данные в случайных наводнениях, с длинными засухами, во время которых Q2 может догнать. Рассматривали ли вы просто ограничение количества параллельных потоков, порожденных из Q1, используя ограниченный пул потоков для этих задач?

Я подозреваю, что вы можете воспользоваться несколькими пулами потоков, если размер задания легко определяется по прибытии. У вас может быть небольшое количество потоков для обработки больших заданий, а также большое количество потоков, готовых обрабатывать небольшие задания. Даже третья промежуточная очередь может оказаться полезной.

+0

Мы уже используем пул потоков. Проблема в том, что довольно сложно судить о том, сколько потоков использовать. Иногда мы получаем 80 тыс. Событий, которые все маленькие, и система в порядке (последняя очередь не накапливается, потому что наборы результатов мало или быстро загружаются в наш распределенный кеш). В других случаях мы получим 100 событий, и они будут зависать, потому что наборы результатов являются массивными, и загрузка выполняется навсегда. Таким образом, ограничение пула прямо заставляет его работать хуже, когда наборы результатов малы, но решает проблему затопления, когда они слишком большие. +1 хотя это хорошее решение, которое я дал :) –

1

Ваша проблема кажется прекрасным примером для решения библиотеки TPL Dataflow.Если вы готовы попробовать, вот как она могла бы работать (это очень простой пример, конечно):

TransformBlock<int, bool> transform = new TransformBlock<int, bool>(i => i > 5 ? true : false, 
      new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 4 }); 
ActionBlock<bool> resultBlock = new ActionBlock<bool>(b => Console.WriteLine("My result is : " + b), 
      new ExecutionDataflowBlockOptions { BoundedCapacity = 10 }); 
transform.LinkTo(resultBlock); 

Вы определяете блок, который сделает ваше преобразование (это работает как ваш Q1) преобразования, вам может установить уровень параллелизма в число задач, которые вы хотите использовать.

Затем вы создаете второй блок (работающий как ваш Q2), который будет иметь BoundedCapacity, и он обрабатывает каждое сообщение синхронно, вызывая действие для каждого элемента. Этот блок может быть заменен любым другим, например BufferBlock, который позволит вам опросить его по запросу.

+0

У меня нет времени играть с этим сейчас, но это кажется интересным; Я попробую еще раз :) Спасибо человеку. –