2014-09-29 2 views
1

Проблема: Есть несколько потоков, обращающихся к ресурсу. Мне нужно ограничить их число постоянным MaxThreads. Темы, которые не могут войти в пул потоков, должны получить сообщение об ошибке.Пул потоков с BlockingCollection

Решение: Я начал использовать в алгоритме ниже в BlockingCollection<string> pool, но я вижу, что BlockingCollection требует вызова CompleteAdding, который я не могу сделать, потому что я всегда получаю входящие сообщения (я жёстко 10 в примере ниже для отладки), подумайте о веб-запросах.

public class MyTest { 

    private const int MaxThreads = 3; 

    private BlockingCollection<string> pool; 

    public MyTest() { 
     pool = new BlockingCollection<string>(MaxThreads); 
    } 

    public void Go() { 
     var addSuccess = this.pool.TryAdd(string.Format("thread ID#{0}", Thread.CurrentThread.ManagedThreadId)); 
     if (!addSuccess) Console.WriteLine(string.Format("thread ID#{0}", Thread.CurrentThread.ManagedThreadId)); 
     Console.WriteLine(string.Format("Adding thread ID#{0}", Thread.CurrentThread.ManagedThreadId)); 
     Console.WriteLine(string.Format("Pool size: {0}", pool.Count)); 

     // simulate work 
     Thread.Sleep(1000); 

     Console.WriteLine("Thread ID#{0} " + Thread.CurrentThread.ManagedThreadId + " is done doing work."); 
     string val; 
     var takeSuccess = this.pool.TryTake(out val); 
     if (!takeSuccess) Console.WriteLine(string.Format("Failed to take out thread ID#{0}", Thread.CurrentThread.ManagedThreadId)); 
     Console.WriteLine("Taking out " + val); 

     Console.WriteLine(string.Format("Pool size: {0}", pool.Count)); 
     Console.WriteLine(Environment.NewLine); 
    } 
} 

static void Main() 
{ 
    var t = new MyTest(); 

    Parallel.For(0, 10, x => t.Go()); 
} 

Любые идеи о том, как я могу добиться этого лучше?

Спасибо!

P.S. Многопоточный новичок здесь, если у вас есть предложения по чтению материалов, я бы очень признателен им.

LE: Исходя из ответов, которые я получил, я был в состоянии достичь желаемого поведения с помощью этого алгоритма:

public class MyTest { 

    private const int MaxThreads = 3; 

    private SemaphoreSlim semaphore; 

    public MyTest() { 
     semaphore = new SemaphoreSlim(MaxThreads, MaxThreads); 
    } 

    public void Go() { 

     Console.WriteLine(string.Format("In comes thread ID#{0}", Thread.CurrentThread.ManagedThreadId)); 
     semaphore.Wait(); 

     try { 

     Console.WriteLine(string.Format("Serving thread ID#{0}", Thread.CurrentThread.ManagedThreadId)); 
     // simulate work 
     Thread.Sleep(1000); 
     Console.WriteLine(string.Format("Out goes thread ID#{0}", Thread.CurrentThread.ManagedThreadId)); 

     } 

     finally { 
      semaphore.Release(); 
     } 

    } 
} 

static void Main() 
{ 
    var t = new MyTest(); 

    Parallel.For(0, 10, x=> t.Go()); 
} 
+1

Рассмотрите возможность использования семафора (http://msdn.microsoft.com/en-us/library/system.threading.semaphore (v = vs.110) .aspx). Семафор подобен вышибалу. Он разрешает определенное количество людей входить и выходить. Если в нем больше людей, чем может работать бар, он никому не разрешит, если кто-то не уйдет. Вы можете использовать перегрузки WaitOne для ожидания указания времени до того, как вы получите исключение тайм-аута – oleksii

ответ

5

Если вы хотите защитить определенное количество потоков, которые могут получить доступ к критической области за раз, вам нужно будет использовать Semaphore или SemaphoreSlim. Я предлагаю последний, который имеет небольшой вес по сравнению с прежним.

Одним из недостатков SemaphoreSlim является то, что они не будут работать с перекрестным процессом, но это нормально, мы имеем Semaphore, чтобы помочь.

Вы можете проверить, заполнен ли Семафор через один из методов ожидания, предоставляемых инфраструктурой с таймаутом.

SemaphoreSlim semaphore = new SemaphoreSlim(3, 3); 

if (!semaphore.Wait(0)) 
{ 
    //Already semaphore full. 
    //Handle it as you like 
} 

http://www.albahari.com/threading/ является очень хорошим ресурсом для нарезания резьбы.