2017-02-07 8 views
1

У меня есть около 10 000 000 задач, каждый из которых занимает от 1 до 10 секунд. Я запускаю эти задачи на мощном сервере, используя 50 разных потоков, где каждый поток выбирает первую незавершенную задачу, запускает ее и повторяет.Выполнение многолетней параллельной задачи в фоновом режиме, позволяя небольшим задачам асинхронного обновления переднего плана

Псевдо-код:

for i = 0 to 50: 
    run a new thread: 
     while True: 
      task = first available task 
      if no available tasks: exit thread 
      run task 

Используя этот код, я могу выполнить все задачи в parallell на любом заданном числе потоков.

В действительности, код использует С # Task.WhenAll, и выглядит следующим образом:

ServicePointManager.DefaultConnectionLimit = threadCount; //Allow more HTTP request simultaneously 
var currentIndex = -1; 
var threads = new List<Task>(); //List of threads 
for (int i = 0; i < threadCount; i++) //Generate the threads 
{ 
    var wc = CreateWebClient(); 
    threads.Add(Task.Run(() => 
    { 
     while (true) //Each thread should loop, picking the first available task, and executing it. 
     { 
      var index = Interlocked.Increment(ref currentIndex); 
      if (index >= tasks.Count) break; 
      var task = tasks[index]; 
      RunTask(conn, wc, task, port); 
     } 
    })); 
} 

await Task.WhenAll(threads); 

Это работает точно так же, как я хотел, но у меня есть проблема: поскольку этот код занимает много времени для запуска, я хочу, чтобы пользователь увидел некоторый прогресс. Прогресс отображается в цветном растровом изображении (представляющем матрицу), а также требуется некоторое время для создания (несколько секунд).

Поэтому я хочу сгенерировать эту визуализацию в фоновом потоке. Но этот другой фоновый поток никогда не выполняется. Мое подозрение состоит в том, что он использует тот же пул потоков, что и параллельный код, и поэтому находится в очереди и не будет выполняться до того, как будет закончен параллельный код. (И это немного слишком поздно.)

Вот пример того, как я произвожу визуализацию прогресса:

private async void Refresh_Button_Clicked(object sender, RoutedEventArgs e) 
{ 
    var bitmap = await Task.Run(() => // <<< This task is never executed! 
    { 
     //bla, bla, various database calls, and generating a relatively large bitmap 
    }); 

    //Convert the bitmap into a WPF image, and update the GUI 
    VisualizationImage = BitmapToImageSource(bitmap); 
} 

Итак, как же я мог лучше всего решить эту проблему? Я мог бы создать список из Task s, где каждый Task представляет одну из моих задач и запускает их с Parallel.Invoke и выбирает другой пул потоков (я думаю). Но тогда мне нужно создать 10 миллионов Task объектов, а не только 50 Task объектов, проходящих через мой массив вещей, чтобы сделать. Похоже, что он использует гораздо больше ОЗУ, чем необходимо. Какие-нибудь умные решения?

EDIT: Как Панайотис Kanavos предложил в одном из своих комментариев, я попытался заменить некоторые из моей логики петли с ActionBlock, как это:

// Create an ActionBlock<int> that performs some work. 
var workerBlock = new ActionBlock<ZoneTask>(
t => 
{ 
    var wc = CreateWebClient(); //This probably generates some unnecessary overhead, but that's a problem I can solve later. 
    RunTask(conn, wc, t, port); 
}, 
// Specify a maximum degree of parallelism. 
new ExecutionDataflowBlockOptions 
{ 
    MaxDegreeOfParallelism = threadCount 
}); 

foreach (var t in tasks) //Note: the objects in the tasks array are not Task objects 
    workerBlock.Post(t); 
workerBlock.Complete(); 

await workerBlock.Completion; 

Примечание: RunTask просто выполняет веб-запрос, используя WebClient, и анализирует результаты. Здесь нет ничего, что могло бы создать мертвый замок.

Это, похоже, работает как старый код параллелизма, за исключением того, что для выполнения заданий требуется 1-2 минуты, чтобы выполнить начальный цикл foreach. Действительно ли эта задержка стоит того?

Тем не менее, моя задача прогресса по-прежнему блокируется. Игнорирование Прогресс < T> предложение на данный момент, так как этот сниженный код все еще страдает та же проблема:

private async void Refresh_Button_Clicked(object sender, RoutedEventArgs e) 
{ 
    Debug.WriteLine("This happens"); 
    var bitmap = await Task.Run(() => 
    { 
     Debug.WriteLine("This does not!"); 
     //Still doing some work here, so it's not optimized away. 
    }; 

    VisualizationImage = BitmapToImageSource(bitmap); 
} 

Так он по-прежнему выглядит как новые задачи не выполняются до тех пор, как parallell задача выполняется. Я даже уменьшил «MaxDegreeOfParallelism» с 50 до 5 (на 24-ядерном сервере), чтобы увидеть, было ли предложение Питера Ричи правильным, но никаких изменений. Любые другие предложения?

ДРУГОЙ EDIT:

Проблема, кажется, в том, что я перегружен пул потоков с вызовы ввода/вывода всей моей одновременной блокировкой. Я заменил WebClient HttpClient и его асинхронные функции, и теперь все работает хорошо.

Спасибо всем за отличные предложения! Хотя не все из них напрямую решили проблему, я уверен, что все они улучшили мой код. :)

+0

Я думаю, это может дать вам отправную точку http://stackoverflow.com/questions/548208/how-to-call-a-method-async-with-some-kind-of-priority – Alex

+0

. У .NET уже есть такой механизм через «Прогресс < T>» и «IProgress» < T> « –

+0

Задачи не являются потоками. Сам TPL заботится об использовании потоков для обработки полезной нагрузки задачи. Что делает 'RunTask' и почему вы просто не используете' Task.Run'? –

ответ

1

.NET уже предоставляет механизм для отчета о прогрессе с IProgress< T> и Progress< T>.

Интерфейс IProgress позволяет клиентам публиковать сообщения с классом Report(T), не беспокоясь о потоковой передаче. Реализация гарантирует, что сообщения обрабатываются в соответствующем потоке, например, в потоке пользовательского интерфейса. Используя простой интерфейс IProgress< T>, фоновые методы отделены от тех, кто обрабатывает сообщения.

Дополнительную информацию можно найти в статье Async in 4.5: Enabling Progress and Cancellation in Async APIs. API-интерфейсы отмены и прогресса не относятся к TPL. Они могут использоваться для упрощения отмены и отчетности даже для сырых потоков.

Прогресс < T> обрабатывает сообщения в потоке, на котором он был создан. Это можно сделать либо путем передачи делегата обработки при создании экземпляра класса, либо путем подписки на событие. Копирование из статьи:

private async void Start_Button_Click(object sender, RoutedEventArgs e) 
{ 
    //construct Progress<T>, passing ReportProgress as the Action<T> 
    var progressIndicator = new Progress<int>(ReportProgress); 
    //call async method 
    int uploads=await UploadPicturesAsync(GenerateTestImages(), progressIndicator); 
} 

где ReportProgress это метод, который принимает параметр Int. Он также может принять сложный класс, сообщившие проделанную работу, сообщения и т.д.

асинхронный метод только должен использовать IProgress.Report, например:

async Task<int> UploadPicturesAsync(List<Image> imageList, IProgress<int> progress) 
{ 
     int totalCount = imageList.Count; 
     int processCount = await Task.Run<int>(() => 
     { 
      int tempCount = 0; 
      foreach (var image in imageList) 
      { 
       //await the processing and uploading logic here 
       int processed = await UploadAndProcessAsync(image); 
       if (progress != null) 
       { 
        progress.Report((tempCount * 100/totalCount)); 
       } 
       tempCount++; 
      } 

      return tempCount; 
     }); 
     return processCount; 
} 

Это разъединяет метод фона от кто принимает и обрабатывает сообщения о ходе выполнения.

+0

Благодарим вас за подробный ответ. Однако вы уверены, что это решение моей проблемы? Возможно, я не был достаточно ясен: проблема не связана с прогрессом.Пока выполняются мои параллельные задачи, я не могу запускать НИЖЕ новые задачи из графического интерфейса. Я подозреваю, что это связано с пулами потоков, но я не знаю, что это достаточно хорошо. Хотя Progress , безусловно, выглядит интересным, похоже, что для работы все еще требуется Task.Run (строка 4, 2-й блок кода), и эта задача. Run не сможет выполнить, пока выполняются параллельные задачи, что делает это непригодно. –

+0

@ ErlenD. нет, нет. Если у вас возникли проблемы с блокировкой, это связано с тем, что сам код блокируется. Добавление еще одной задачи. Запуск, который, в конечном итоге, вызывает вызов BeginInvoke, будет по-прежнему блокироваться в потоке пользовательского интерфейса. Вам необходимо ** упростить ** ваш код, удалив любой код, добавленный в «исправить» задачи. –

+1

@ ErlenD. например, задачи не являются потоками. Вы * не * нуждаетесь и * не должны * использовать бесконечный цикл для «выбора задач». Это то, что уже делает TPL. Он использует свои собственные потоки для сбора и обработки задач, * без * блокировки. Вы не отправили код в «RunTask», так что трудно сказать, что происходит не так. –