0

В настоящее время я занимаюсь разработкой системы обмена сообщениями для своего приложения (которое использует AMQP на сервере через RabbitMQ). Будут несколько экземпляров, в которых метод может получать данные из нескольких источников одновременно (т. Е. Не обязательно иметь последовательные запросы).Threading to Parallelize Synchronous IO in .Net

Первоначально я собирался использовать ThreadPool и QueueUserWorkItem для каждого другого запроса в методе, а затем каким-то образом присоединиться к ним. Это может быть проблематично, потому что несколько разных компонентов приложения могут сделать это сразу, и каждый компонент может иметь большое количество параллельных запросов, которые могли бы голодать ThreadPool.

Есть ли более эффективный/эффективный способ сделать это?

ответ

2

Это нормально, чтобы подчеркнуть threadpool. Вы можете бросить кучу workitems на нем - сотни, тысячи - и просто позвольте er rip. Не уверен, что вы подразумеваете под «голоданием». Если не существует набора рабочих элементов, которые должны быть приоритетными по-разному, вам, вероятно, не нужно беспокоиться о голоде.

Если вы используете QUWI, вам решать, как объединить полученные результаты снова в один результат.


Звучит так, как будто вы делаете подход к карте/уменьшению. Вот быстрая функция карты, использующая QUWI, и пример того, как ее использовать.

public static IEnumerable<T2> Map_QUWI<T, T2>(List<T> inputs, Func<T, T2> fn) 
{ 
    int c = inputs.Count; 
    if (c == 0) return null; 
    T2[] result = new T2[c]; 
    if (c == 1) 
    { 
     // only one input - perform the work on main thread 
     result[0] = fn(inputs[0]); 
     return result; 
    } 

    using (ManualResetEvent done = new ManualResetEvent(false)) 
    { 
     int countdown = inputs.Count; 
     WaitCallback cb = delegate (Object obj) 
      { 
       int ix = (int)obj; 
       result[ix] = fn(inputs[ix]); 
       if (Interlocked.Decrement(ref countdown) == 0) 
        done.Set(); // signal all done 
      }; 

     // queue up all workitems 
     for (int i = 0; i < c; i++) 
      ThreadPool.QueueUserWorkItem(cb,i); 

     // Wait for done.Set(), which happens in the WaitCallback 
     // when the last workitem is completed. 
     done.WaitOne(); 
    } 

    return result; 
} 

Пример использования:

// returns the number of prime numbers, less than or equal to x 
private int NumberOfPrimesLessThanOrEqualTo(int x) 
{ 
    int count= 0; 
    int n = x; 
    if (n>=2) count++; 
    if (x%2==0) n--; 
    if (n>0) 
    { 
     do 
     { 
      if (IsPrime(n)) count++; 
      n-=2; 
     } while (n>0); 
    } 
    return count; 
} 


private void Demo() 
{ 
    var list = new List<int>(new int[] {2,4,8,16,32,64,128,256,512,1024,2048, 
             4096,8192,16384,32768,65536,131072}); 
    Func<int,int> fn = NumberOfPrimesLessThanOrEqualTo; 
    var result= Map_QUWI(list, fn); 
    (new List<int>(result)).ForEach(System.Console.WriteLine); 
}