2010-02-02 5 views
2

Я ищу хороший метод отслеживания (подсчета), с которым рабочие потерпели неудачу при постановке в очередь с помощью Threadpool и использования WaitHandle.WaitAll() для завершения всех потоков.Устойчивый метод отслеживания неработающих работников с ThreadPool

Является ли блокировка счетчика хорошей техникой или существует более надежная стратегия?

+0

Как вы видите нити отчетности свой статус? Было бы хорошо, если бы они просто установили 'bool' в' true' или 'false'? – jason

+0

Я думал, что порядок в очереди будет иметь статический счетчик –

+0

Я не следую; кажется, что счетчик полезен только для того, чтобы рассказать вам, сколько рабочих было успешным или неудавшимся, но не было, какие рабочие преуспели или потерпели неудачу. Что мне не хватает? – jason

ответ

1

Хорошо, вот подход, который вы могли бы предпринять. Я инкапсулировал данные, которые мы хотим отслеживать в классе TrackedWorkers. В этом классе есть конструктор, который позволяет вам установить, сколько рабочих будет работать. Затем рабочие запускаются с использованием LaunchWorkers, для чего требуется делегат, который ест object и возвращает bool. object представляет вход для рабочего, а bool представляет собой успех или неудачу в зависимости от true или false, являющихся возвращаемым значением, соответственно.

Итак, в основном, что мы делаем, у нас есть массив для отслеживания состояния штата. Мы запускаем рабочих и устанавливаем статус, соответствующий этому работнику, в зависимости от возвращаемого значения от работника. Когда рабочий возвращается, мы устанавливаем AutoResetEvent и WaitHandle.WaitAll для всех установленных AutoResetEvents.

Обратите внимание, что существует вложенный класс для отслеживания работы (делегата), которую должен выполнять рабочий, вход в эту работу и ID, используемый для установки статуса AutoResetEvent, соответствующего этому потоку.

Заметьте очень внимательно, что после завершения работы мы не проводим ссылки на делегата func и ни на input. Это важно, чтобы мы случайно не предотвратили сбор мусора.

Существуют методы получения статуса конкретного работника, а также все индексы успешных рабочих и все индексы рабочих, которые потерпели неудачу.

Последнее примечание: я не считаю это производство кода готовым. Это всего лишь эскиз подхода, который я бы взял. Вам нужно позаботиться о том, чтобы добавить тестирование, обработку исключений и другие подобные детали.

class TrackedWorkers { 
    class WorkerState { 
     public object Input { get; private set; } 
     public int ID { get; private set; } 
     public Func<object, bool> Func { get; private set; } 
     public WorkerState(Func<object, bool> func, object input, int id) { 
      Func = func; 
      Input = input; 
      ID = id; 
     } 
    } 

    AutoResetEvent[] events; 
    bool[] statuses; 
    bool _workComplete; 
    int _number; 

    public TrackedWorkers(int number) { 
     if (number <= 0 || number > 64) { 
      throw new ArgumentOutOfRangeException(
       "number", 
       "number must be positive and at most 64" 
      ); 
     } 
     this._number = number; 
     events = new AutoResetEvent[number]; 
     statuses = new bool[number]; 
     _workComplete = false; 
    } 

    void Initialize() { 
     _workComplete = false; 
     for (int i = 0; i < _number; i++) { 
      events[i] = new AutoResetEvent(false); 
      statuses[i] = true; 
     } 
    } 

    void DoWork(object state) { 
     WorkerState ws = (WorkerState)state; 
     statuses[ws.ID] = ws.Func(ws.Input); 
     events[ws.ID].Set(); 
    } 

    public void LaunchWorkers(Func<object, bool> func, object[] inputs) { 
     Initialize(); 
     for (int i = 0; i < _number; i++) { 
      WorkerState ws = new WorkerState(func, inputs[i], i); 
      ThreadPool.QueueUserWorkItem(this.DoWork, ws); 
     } 
     WaitHandle.WaitAll(events); 
     _workComplete = true; 
    } 

    void ThrowIfWorkIsNotDone() { 
     if (!_workComplete) { 
      throw new InvalidOperationException("work not complete"); 
     } 
    } 

    public bool GetWorkerStatus(int i) { 
     ThrowIfWorkIsNotDone(); 
     return statuses[i]; 
    } 

    public IEnumerable<int> SuccessfulWorkers { 
     get { 
      return WorkersWhere(b => b); 
     } 
    } 

    public IEnumerable<int> FailedWorkers { 
     get { 
      return WorkersWhere(b => !b); 
     } 
    } 

    IEnumerable<int> WorkersWhere(Predicate<bool> predicate) { 
     ThrowIfWorkIsNotDone(); 
     for (int i = 0; i < _number; i++) { 
      if (predicate(statuses[i])) { 
       yield return i; 
      } 
     } 
    } 
} 

Пример использования:

class Program { 
    static Random rg = new Random(); 
    static object lockObject = new object(); 
    static void Main(string[] args) { 
     int count = 64; 
     Pair[] pairs = new Pair[count]; 
     for(int i = 0; i < count; i++) { 
      pairs[i] = new Pair(i, 2 * i); 
     } 
     TrackedWorkers workers = new TrackedWorkers(count); 
     workers.LaunchWorkers(SleepAndAdd, pairs.Cast<object>().ToArray()); 
     Console.WriteLine(
      "Number successful: {0}", 
      workers.SuccessfulWorkers.Count() 
     ); 
     Console.WriteLine(
      "Number failed: {0}", 
      workers.FailedWorkers.Count() 
     ); 
    } 
    static bool SleepAndAdd(object o) { 
     Pair pair = (Pair)o; 
     int timeout; 
     double d; 
     lock (lockObject) { 
      timeout = rg.Next(1000); 
      d = rg.NextDouble(); 
     } 
     Thread.Sleep(timeout); 
     bool success = d < 0.5; 
     if (success) { 
      Console.WriteLine(pair.First + pair.Second); 
     } 
     return (success); 

    } 
} 

выше программа собирается запустить шестьдесят четыре темы. i-й поток имеет задачу добавить числа i и 2 * i и распечатать результат на консоли. Тем не менее, я добавил случайное количество сна (менее одной секунды), чтобы имитировать занятость, и я переворачиваю монету, чтобы определить успех или неудачу нити. Те, кто преуспевают, печатают сумму, с которой им было поручено, и возвращают true. Те, которые не распечатывают ничего и возвращают false.

Здесь я использовал

struct Pair { 
    public int First { get; private set; } 
    public int Second { get; private set; } 
    public Pair(int first, int second) : this() { 
     this.First = first; 
     this.Second = second; 
    } 
} 
+0

+5 для ваших усилий. Я думаю, что я, возможно, неправильно сформулировал этот вопрос или получаю больше ответов. Одно незначительное изменение - делегаты, а не лямбда –

+0

Я буду перефразировать это, поскольку лямбда в порядке. Являются ли события плохой альтернативой? Я понимаю ограничение WorkItem, но существует ли ограничение относительно количества элементов, которые вы ввели в threadpool? Извините, если я морфирую это на 2 других вопроса –

+0

@Chris S: Ну, вопрос был немного неясным; вы оставили несколько вещей неуточненными. Это помогает при задании вопросов как можно точнее, без педантичности, конечно. Тем не менее, мы прояснили некоторые моменты в комментариях к вашему вопросу. Что вы подразумеваете под «событиями, плохой альтернативой»? Как механизм сигнализации? Что касается предела, существует ограничение на количество «AutoResetEvent», на которое можно ждать. Если вам нужно больше потоков, вам нужно будет выполнить некоторую работу, чтобы разделить их между несколькими коллекциями «AutoResetEvent». – jason