2015-02-14 3 views
2

У меня есть служба облачного облака с классической конфигурацией WebRole + Worker. Рабочий принимает сообщения из очереди, обрабатывает их, а затем удаляет их один раз за раз.Обработка работы многопоточной обработки роли Azure Worker

Мой код выглядит так:

public override void Run() 
    { 
     Trace.TraceInformation("Worker is running"); 
      try 
      { 
       this.RunAsync(this.cancellationTokenSource.Token).Wait(); 
      } 
      finally 
      { 
       this.runCompleteEvent.Set(); 
      } 
    } 

public override bool OnStart() 
     { 
      ServicePointManager.DefaultConnectionLimit = 500; 
      bool result = base.OnStart(); 
      Trace.TraceInformation("WorkerAnalytics has been started"); 
      return result; 
     } 



private async Task RunAsync(CancellationToken cancellationToken) 
     { 
      var queue = ....//omitted info for brevity 
      CloudQueueMessage retrievedMessage = null; 

      while (!cancellationToken.IsCancellationRequested) 
      { 
       try 
        { 
         retrievedMessage = await queue.GetMessageAsync(); 
         if (retrievedMessage != null) 
         { 
          await ProcessMessage(retrievedMessage); 
         } 
         else 
         { 
          System.Threading.Thread.Sleep(500); 
         } 
        } 
        catch (Exception e) 
        { 
         System.Threading.Thread.Sleep(500); 
        } 
       } 
      } 

     } 

Теперь это работает отлично, но процессор является очень низким, на уровне 3%, он обрабатывает только один элемент в то время (около 1 сек каждый), но очередь имеют около 1000 новых элементов в секунду, и этого недостаточно.

Как обрабатывать больше сообщений очереди за раз, используя всю мощность ЦП, имеющуюся в машине, и не слишком усложнять этот код?

Также для чего предназначен ServicePointManager.DefaultConnectionLimit?

Я искал часы для эффективного многопоточного решения для рабочих ролей, но теперь все WebJobs или старые рамки, которые усложняют ситуацию.

Спасибо

ответ

4

Вы можете попробовать запустить несколько Такс из RunAsync().

var tasks = new List<Task>(); 
tasks.Add(this.RunAsync(this.cancellationTokenSource.Token)); 
tasks.Add(this.RunAsync(this.cancellationTokenSource.Token)); 
tasks.Add(this.RunAsync(this.cancellationTokenSource.Token)); 
Task.WaitAll(tasks.ToArray()); 
+0

спасибо, это похоже на работу! Сколько заданий вы рекомендуете создавать? Лазурная машина является двухъядерной с 3 ГБ оперативной памяти Очередь очень быстро заполнена, как 1000 новых записей/сек, я боюсь, что мне понадобится много примеров. –

+0

Ну старайтесь постепенно увеличивать количество задач и находить нужное количество задач. Если вы начнете видеть повышение производительности процессора или производительности до более крупной виртуальной машины или выполните несколько экземпляров рабочей роли – pauliusnrk

+0

@pauliusnrk Где мне нужно написать этот код? вместо этой строки "this.RunAsync (this.cancellationTokenSource.Token) .Wait();" ? – Ron