Задача здесь довольно проста (или так я думал ...). Я хочу заполнить очередь с помощью методов, которые будут выполняться (все они возвратят результат объекта), а затем я хочу, чтобы какое-то количество потоков выходило из этой очереди, выполняли методы и добавляли результаты в какую-то другую коллекцию (словарь в этом случае), который будет возвращен, когда вся работа будет завершена. Основной метод будет вызываться в основном потоке, который запустит обработку и должен заблокировать, пока все потоки не закончат выполнение того, что они делают, и возвратите коллекцию с результатами. Так я собрал этот класс:C# Lock Threading Issue
public class BackgroundWorkManager
{
public delegate object ThreadTask();
private Thread[] workers;
private ManualResetEvent workerThreadMre;
private ManualResetEvent mainThreadMre;
private Queue<WorkItem> workQueue;
private Dictionary<string, object> results;
private object writeLock;
private int activeTasks;
private struct WorkItem
{
public string name;
public ThreadTask task;
public WorkItem(string name, ThreadTask task)
{
this.name = name;
this.task = task;
}
}
private void workMethod()
{
while (true)
{
workerThreadMre.WaitOne();
WorkItem task;
lock (workQueue)
{
if (workQueue.Count == 0)
{
workerThreadMre.Reset();
continue;
}
task = workQueue.Dequeue();
}
object result = task.task();
lock (writeLock)
{
results.Add(task.name, result);
activeTasks--;
if (activeTasks == 0)
mainThreadMre.Set();
}
}
}
public BackgroundWorkManager()
{
workers = new Thread[Environment.ProcessorCount];
workerThreadMre = new ManualResetEvent(false);
mainThreadMre = new ManualResetEvent(false);
workQueue = new Queue<WorkItem>();
writeLock = new object();
activeTasks = 0;
for (int i = 0; i < Environment.ProcessorCount; i++)
{
workers[i] = new Thread(workMethod);
workers[i].Priority = ThreadPriority.Highest;
workers[i].Start();
}
}
public void addTask(string name, ThreadTask task)
{
workQueue.Enqueue(new WorkItem(name, task));
}
public Dictionary<string, object> process()
{
results = new Dictionary<string, object>();
activeTasks = workQueue.Count;
mainThreadMre.Reset();
workerThreadMre.Set();
mainThreadMre.WaitOne();
workerThreadMre.Reset();
return results;
}
}
Это прекрасно работает, если я использовал объект один раз, чтобы обработать очередь методов, но если я пытаюсь что-то вроде этого перерыва
BackgroundWorkManager manager = new BackgroundWorkManager();
for (int i = 0; i < 20; i++)
{
manager.addTask("result1", (BackgroundWorkManager.ThreadTask)delegate
{
return (object)(1);
});
manager.process();
}
Things. Я либо зацикливаюсь, либо получаю исключение, говоря, что словарь, в котором я пишу результаты, уже содержит ключ (но Visual Studio Debugger говорит, что он пуст). Добавление «Thread.Sleep (1)» к методу работы, похоже, исправить, что причудливо. Это мой первый опыт работы с потоками, поэтому я не уверен, что я ужасно злоупотребляю замками или что. Если бы кто-нибудь мог дать некоторое представление о том, что я делаю неправильно, это было бы весьма признательно.
Вы добавляете несколько задач с тем же именем. Почему вы удивлены тем, что «словарь уже содержит ключ»? Кроме того, должно быть довольно легко удовлетворить ваши требования с помощью PLINQ. Простой 'AsParallel' +' ToDictionary' должен сделать трюк, без блокировки или чего-то еще. – MarcinJuraszek
Поскольку метод процесса создает новый экземпляр словаря, результаты которого затем добавляются. – user1869878
Тот факт, что он создает новый словарь, не имеет никакого значения. Вы назначаете это поле в классе, которое используется несколькими разными потоками, поэтому довольно вероятно, что несколько потоков будут записываться в один и тот же экземпляр словаря '. –
MarcinJuraszek