2009-04-29 6 views
17

Я ищу реализацию потоковой блокировки для .NET. «Потоковая блокирующая очередь» Я имею в виду: - потокобезопасный доступ к очереди, в которой метод метода Dequeue блокирует поток до тех пор, пока другой поток не поместит (Enqueue) некоторое значение.Реализация очереди блокировки потоков в .NET.

К моменту, когда I'v нашёл этот: http://www.eggheadcafe.com/articles/20060414.asp (Но это для .NET 1.1).

Может кто-то прокомментировать/критиковать правильность этой реализации. Или предложите другой. Спасибо заранее.

ответ

8

Как насчет этого Creating a blocking Queue in .NET?

Если вам это нужно для .NET 1.1 (я не был уверен в этом вопросе), просто отбросьте дженерики и замените T на object.

+0

Спасибо за ссылку. Странно, что я не нашел эту тему, ища «очередь». Хорошо, эта тема и моя суть примерно то же самое. кстати: Мне не нужно портировать в .net 1.1. Решение по смежной теме очень похоже на http://www.eggheadcafe.com/articles/20060414.asp – Shrike

+1

@Shrike, не странно. Еще один пример того, насколько плохим является поиск StackOverflow. Это так плохо, что каждый скажет вам просто использовать Google (и команду «сайт:»). – Ash

+0

@Ash: это, вероятно, правда ... это, безусловно, как я ищу stackoverflow ;-p –

0

Queue.Synchronized http://msdn.microsoft.com/en-us/library/system.collections.queue.synchronized(VS.71).aspx

является отправной точкой в ​​любом случае, я никогда не использовал блокировку очереди. Извините за несовместимое сообщение.

+1

Я думаю, что это просто потокобезопасная очередь, которая немного отличается от того, что он задает. – tylerl

+1

Да, я видел это после того, как отправил. Отредактировано для отражения –

+0

Использует ли метод Dequeue обертки, возвращаемый Queue.Synchronized() блокирует текущий поток? № Представьте, что у вас есть два потока, один из которых помещает значения в очередь, а другой - из этой очереди. Второй поток должен объединить очередь в цикл, потребляющий процессор. Это плохо. – Shrike

0

Microsoft имеет довольно хороший образец об этом:

//Copyright (C) Microsoft Corporation. All rights reserved. 

using System; 
using System.Threading; 
using System.Collections; 
using System.Collections.Generic; 

// The thread synchronization events are encapsulated in this 
// class to allow them to easily be passed to the Consumer and 
// Producer classes. 
public class SyncEvents 
{ 
    public SyncEvents() 
    { 
     // AutoResetEvent is used for the "new item" event because 
     // we want this event to reset automatically each time the 
     // consumer thread responds to this event. 
     _newItemEvent = new AutoResetEvent(false); 

     // ManualResetEvent is used for the "exit" event because 
     // we want multiple threads to respond when this event is 
     // signaled. If we used AutoResetEvent instead, the event 
     // object would revert to a non-signaled state with after 
     // a single thread responded, and the other thread would 
     // fail to terminate. 
     _exitThreadEvent = new ManualResetEvent(false); 

     // The two events are placed in a WaitHandle array as well so 
     // that the consumer thread can block on both events using 
     // the WaitAny method. 
     _eventArray = new WaitHandle[2]; 
     _eventArray[0] = _newItemEvent; 
     _eventArray[1] = _exitThreadEvent; 
    } 

    // Public properties allow safe access to the events. 
    public EventWaitHandle ExitThreadEvent 
    { 
     get { return _exitThreadEvent; } 
    } 
    public EventWaitHandle NewItemEvent 
    { 
     get { return _newItemEvent; } 
    } 
    public WaitHandle[] EventArray 
    { 
     get { return _eventArray; } 
    } 

    private EventWaitHandle _newItemEvent; 
    private EventWaitHandle _exitThreadEvent; 
    private WaitHandle[] _eventArray; 
} 

// The Producer class asynchronously (using a worker thread) 
// adds items to the queue until there are 20 items. 
public class Producer 
{ 
    public Producer(Queue<int> q, SyncEvents e) 
    { 
     _queue = q; 
     _syncEvents = e; 
    } 
    public void ThreadRun() 
    { 
     int count = 0; 
     Random r = new Random(); 
     while (!_syncEvents.ExitThreadEvent.WaitOne(0, false)) 
     { 
      lock (((ICollection)_queue).SyncRoot) 
      { 
       while (_queue.Count < 20) 
       { 
        _queue.Enqueue(r.Next(0, 100)); 
        _syncEvents.NewItemEvent.Set(); 
        count++; 
       } 
      } 
     } 
     Console.WriteLine("Producer thread: produced {0} items", count); 
    } 
    private Queue<int> _queue; 
    private SyncEvents _syncEvents; 
} 

// The Consumer class uses its own worker thread to consume items 
// in the queue. The Producer class notifies the Consumer class 
// of new items with the NewItemEvent. 
public class Consumer 
{ 
    public Consumer(Queue<int> q, SyncEvents e) 
    { 
     _queue = q; 
     _syncEvents = e; 
    } 
    public void ThreadRun() 
    { 
     int count = 0; 
     while (WaitHandle.WaitAny(_syncEvents.EventArray) != 1) 
     { 
      lock (((ICollection)_queue).SyncRoot) 
      { 
       int item = _queue.Dequeue(); 
      } 
      count++; 
     } 
     Console.WriteLine("Consumer Thread: consumed {0} items", count); 
    } 
    private Queue<int> _queue; 
    private SyncEvents _syncEvents; 
} 

public class ThreadSyncSample 
{ 
    private static void ShowQueueContents(Queue<int> q) 
    { 
     // Enumerating a collection is inherently not thread-safe, 
     // so it is imperative that the collection be locked throughout 
     // the enumeration to prevent the consumer and producer threads 
     // from modifying the contents. (This method is called by the 
     // primary thread only.) 
     lock (((ICollection)q).SyncRoot) 
     { 
      foreach (int i in q) 
      { 
       Console.Write("{0} ", i); 
      } 
     } 
     Console.WriteLine(); 
    } 

    static void Main() 
    { 
     // Configure struct containing event information required 
     // for thread synchronization. 
     SyncEvents syncEvents = new SyncEvents(); 

     // Generic Queue collection is used to store items to be 
     // produced and consumed. In this case 'int' is used. 
     Queue<int> queue = new Queue<int>(); 

     // Create objects, one to produce items, and one to 
     // consume. The queue and the thread synchronization 
     // events are passed to both objects. 
     Console.WriteLine("Configuring worker threads..."); 
     Producer producer = new Producer(queue, syncEvents); 
     Consumer consumer = new Consumer(queue, syncEvents); 

     // Create the thread objects for producer and consumer 
     // objects. This step does not create or launch the 
     // actual threads. 
     Thread producerThread = new Thread(producer.ThreadRun); 
     Thread consumerThread = new Thread(consumer.ThreadRun); 

     // Create and launch both threads.  
     Console.WriteLine("Launching producer and consumer threads...");   
     producerThread.Start(); 
     consumerThread.Start(); 

     // Let producer and consumer threads run for 10 seconds. 
     // Use the primary thread (the thread executing this method) 
     // to display the queue contents every 2.5 seconds. 
     for (int i = 0; i < 4; i++) 
     { 
      Thread.Sleep(2500); 
      ShowQueueContents(queue); 
     } 

     // Signal both consumer and producer thread to terminate. 
     // Both threads will respond because ExitThreadEvent is a 
     // manual-reset event--so it stays 'set' unless explicitly reset. 
     Console.WriteLine("Signaling threads to terminate..."); 
     syncEvents.ExitThreadEvent.Set(); 

     // Use Join to block primary thread, first until the producer thread 
     // terminates, then until the consumer thread terminates. 
     Console.WriteLine("main thread waiting for threads to finish..."); 
     producerThread.Join(); 
     consumerThread.Join(); 
    } 
} 
0

Пожалуйста, имейте в виду, что блокировка в код вызова может быть лучшим вариантом, если у вас есть полный контроль над ним. Рассмотрите возможность доступа к вашей очереди в цикле: вы будете бесполезно приобретать блокировки несколько раз, что может привести к снижению производительности.

1

Пример Microsoft является хорошим, но он не инкапсулирован в класс. Кроме того, это требует, чтобы потребительский поток работал в MTA (из-за вызова WaitAny). Есть несколько случаев, когда вам может потребоваться запустить STA (например, если вы выполняете COM-взаимодействие). В этих случаях WaitAny не может быть использован.

У меня есть простая блокировка класс очереди, которая преодолевает эту проблему здесь: http://element533.blogspot.com/2010/01/stoppable-blocking-queue-for-net.html

20

Для справки .NET 4 вводит System.Collections.Concurrent.BlockingCollection<T> типа для решения этого. Для неблокирующей очереди вы можете использовать System.Collections.Concurrent.ConcurrentQueue<T>. Обратите внимание, что ConcurrentQueue<T>, вероятно, будет использоваться в качестве базового хранилища данных для BlockingCollection<T> для использования OP.

+0

Не могли бы вы предоставить образец кода, который добавит пару методов, чтобы мы могли видеть это в действии? – theJerm

 Смежные вопросы

  • Нет связанных вопросов^_^