2013-09-05 2 views
1

Я хочу, чтобы два потока работали с одной очередью. Первый поток следует вызывать каждые 2 секунды, а второй поток - каждые 3 секунды. Оба потока должны запускаться одновременно. У меня проблема при доступе к первому элементу очереди. Оба потока принимают элемент с индексом 0. Иногда это происходит с другими элементами очереди, а не только с первым элементом. У меня есть такой выход на консоли:ConcurrentQueue один элемент берет на себя двумя потоками

  • позицию 0, обработанный 1 Время: 3: 27: 8
  • Пункт 0, обработанного 2 Время: 3: 27: 8
  • Пункт 2, обработанный 1 Время: 3:27:10
  • Пункт 3 обрабатывается 2 Время: 3:27:11
  • Пункт 4, обработанного 1 Время: 3:27:12

и так далее ..

Вот код, я использую:

ConcurrentQueue<int> sharedQueue = new ConcurrentQueue<int>(); 

    for (int i = 0; i < 10; i++) 
    { 
     sharedQueue.Enqueue(i); 
    } 


    int itemCount= 0; 


    Task[] tasks = new Task[2]; 
    for (int i = 0; i < tasks.Length; i++) 
    { 
     // create the new task 
     tasks[i] = new Task(() => 
     { 
      while (sharedQueue.Count > 0) 
      { 
       // define a variable for the dequeue requests 
       int queueElement; 
       // take an item from the queue 
       bool gotElement = sharedQueue.TryDequeue(out queueElement); 
       // increment the count of items processed 
       if (gotElement) 
       { 
        DateTime dt = DateTime.Now; 
        Console.WriteLine("Item " + itemCount + "processed by " 
         + Task.CurrentId + " Time: " + dt.Hour + ":" + dt.Minute + ":" + dt.Second); 
        Interlocked.Increment(ref itemCount); 
       if (Task.CurrentId == 1) 
        Thread.Sleep(2000); 
       else 
        Thread.Sleep(3000);      
       } 

      } 
     }); 
     // start the new task 
     tasks[i].Start(); 


    } 
    // wait for the tasks to complete 
    Task.WaitAll(tasks); 
    // report on the number of items processed 
    Console.WriteLine("Items processed: {0}", itemCount); 
    // wait for input before exiting 
    Console.WriteLine("Press enter to finish"); 
    Console.ReadLine(); 
} 

ответ

4

Замените следующую строку:

Console.WriteLine("Item " + itemCount + "processed by " ...); 

С этой линией:

Console.WriteLine("Item " + queueElement + "processed by " ...); 

Проблема вы видите, вероятно, из-за задачи, выполняющие Console.WriteLine почти одновременно, и оба имеют одинаковое значение itemCount, потому что они чередуются таким образом, что звонки Interlocked.Increment еще не произошли. Вероятно, имеет смысл распечатать queueElement в любом случае, поскольку это более значимо.

+1

+1. Более того, 'itemCount' не имеет смысла, потому что Thread A может получить первый элемент, Thread B получает второй, но затем Thread B увеличивает« itemCount »первым. Так что хотя Thread B действительно получил пункт 2, он сообщил бы, что получил предмет 1. –

+0

@JimMischel: Ха ... да, я даже об этом не думал! –

4

См. Brian Gideon's excellent answer относительно вашего itemCount проблема.

Возможно, вы захотите переписать код для использования BlockingCollection, а не ConcurrentQueue<T>. С этим работать гораздо проще. BlockingCollection - это обертка для параллельных коллекций. В своей конфигурации по умолчанию резервное хранилище является ConcurrentQueue. Таким образом, вы получаете те же функции параллельной очереди, но с гораздо более приятным интерфейсом.

BlockingCollection<int> sharedQueue = new BlockingCollection<int>(); 

for (int i = 0; i < 10; i++) 
{ 
    sharedQueue.Add(i); 
} 

// CompleteAdding marks the queue as "complete for adding," 
// meaning that no more items will be added. 
sharedQueue.CompleteAdding(); 

int itemCount= 0; 

Task[] tasks = new Task[2]; 
for (int i = 0; i < tasks.Length; i++) 
{ 
    // create the new task 
    tasks[i] = new Task(() => 
    { 
     foreach (var queueElement in sharedQueue.GetConsumingEnumerable()) 
     { 
      DateTime dt = DateTime.Now; 
      Console.WriteLine("Item " + itemCount + "processed by " 
       + Task.CurrentId + " Time: " + dt.Hour + ":" + dt.Minute + ":" + dt.Second); 
      Interlocked.Increment(ref itemCount); 
      if (Task.CurrentId == 1) 
       Thread.Sleep(2000); 
      else 
       Thread.Sleep(3000);      
     } 
    }); 

    // start the new task 
    tasks[i].Start(); 
} 

GetConsumingEnumerable возвращает перечислитель, который получит следующий элемент из очереди, пока очередь не опустеет. Он также отлично справляется с отменой, что немного сложнее с ConcurrentQueue.

В общем, в любое время, когда вы думаете об использовании ConcurrentQueue<T>, вы, вероятно, захотите BlockingCollection<T>.

 Смежные вопросы

  • Нет связанных вопросов^_^