2015-08-13 6 views
1

Я изменяю существующий проект Windows Workflow Foundation, который был ранее закодирован, чтобы запускать все синхронно. Однако, поскольку набор данных вырос, это необходимо для изменения в соответствии с требованиями к производительности.Истинный параллелизм в коллекции в Windows WF 4.5

Что у меня есть: enter image description here

Внутри процесса У меня есть родительский последовательности рабочего процесса, который содержит несколько элементарных рабочих процессов, которые в основном набор нескольких услуг и подготавливает их для работы. У меня есть основная часть работы рабочего процесса, которая состоит из рабочего процесса ForEach, который работает с коллекцией из примерно 15000 элементов, которые занимают около 1-3 секунд для каждого элемента для обработки (тайминги составляют около 70% CPU, 10% задержка в сети , 20% запросов/доступа к базе данных). Очевидно, что это слишком долго тянет WAYYYY. Мне нужно, чтобы улучшить это время, примерно в 5 (занимает около 5-6 часов, нужно, чтобы добраться до около 1 часа)

Delima:

Я никогда не работал с Windows, Workflows до этого проекта поэтому я очень не знаком с тем, как добиться в противном случае простых реализаций параллельного выполнения в коллекции.

Идея:

Я читал о различном Workflow деятельности и решил, что ParallelForEach Workflow активность, вероятно, будет путем. Моя идея состояла в том, что я просто отключу свой рабочий процесс ForeEach с активностью ParallelForEach Workflow и достигню параллелизма в том, как Parallel.Foreach() работает в параллельной библиотеке задач. К сожалению, это не похоже на то, как выполняется ParallelForEach Workflow Activity. Вместо того, чтобы планировать работу над каждой коллекцией в нескольких потоках и переключать контекст при ожидании очередного потока, ParallelForEach Workflow Activity, похоже, просто помещает каждую итерацию в стек и работает на них почти синхронно, если только тело рабочего процесса «Idle» (который я не верю, что это то же самое, как «ожидание» на I/O Вроде бы явное состояние, которое должно быть установлено на рабочем процессе деятельности, в MSDN:.

ParallelForEach перечисляет его значения и графики для тела для каждого значения, которое он перечисляет. Он только планирует Тело. Как выполняется тело , зависит от того, будет ли тело простаивать. Если тело не не работает, оно выполняется в режиме ожидания se, так как запланированные операции обрабатываются как стек, выполняется первая запланированная активность . Например, если у вас есть коллекция {1,2,3,4} в ParallelForEach и используйте WriteLine в качестве тела для записи значения . У вас есть 4, 3, 2, 1, напечатанные на консоли. Это связано с тем, что WriteLine не переходит в режим ожидания, поэтому после выполнения 4 операций WriteLine они выполняются с использованием поведения стека (сначала в последнем случае).

Но если у вас есть действия в теле, которые могут простаивать, как Получение активности или Задержка. Тогда нет необходимости ждать завершения . ParallelForEach переходит к следующему плановому телу и пытается его выполнить. Если эта активность также простаивает, ParallelForEach снова переходит к следующей активности тела.

Где я сейчас:

При выполнении моей «идеи» выше с ParallelForEach Workflow деятельности, я достигаю примерно в то же время работает в качестве обычного ForEach Workflow деятельности. Я рассматривал возможность создания базового метода BeginWorkflow async, но я не уверен, что это будет хорошая идея или нет с тем, как работает Windows WF.

Мне нужна ваша помощь:

Кто-нибудь есть какие-либо предложения о том, как я могу достичь результатов, которые я пытаюсь попасть? Есть ли другой способ реализовать то, что будет выполнять тело рабочего процесса foreach параллельно на столько потоков, сколько возможно? У меня 8 логических процессоров, и я хочу использовать их все, потому что каждая итерация коллекции не зависит от других.

Любые идеи ??

+1

Если большую часть времени из-за I/O, то, чтобы сделать его параллельно не даст вам такой большой шаг вперед ... –

+0

Верно, но я закодировали реализации этих методов использовать Async/Ждут так поток освобождается. Однако, поскольку все они работают в одном потоке, это, похоже, не помогает, потому что хотя поток свободен, следующая итерация тела не выполняется. Я считаю, что это нужно сделать больше с тем, как реализуется планировщик Workflow. –

+0

1.5 x 15000 = 6 часов и 15 минут. Или 1,5 не является правильным средним? –

ответ

0

Рабочая среда Workflow однопоточная. Чтобы действительно выполнять параллельную работу, вам нужно управлять своими потоками (как-то). Я предполагаю, что ваши действия просто делают свое дело в методе Execute, и время выполнения позволяет выполнять только одно выполнение за один раз.

Ниже приведен код для класса NonblockingNativeActivity. Это было полезно для нас, я надеюсь, это и поможет вам. Используйте это как базовый класс для своих действий, вместо того чтобы переопределять Execute, переопределить ExecuteNonblocking. Вы также можете переопределить PrepareToExecute и AfterExecute, если вам нужно работать со временем выполнения Workflow, но они будут однопоточными.

using System.Text; 
using System.Activities.Hosting; 
using System.Activities; 
using System.Diagnostics; 
using System.Threading.Tasks; 
using System.Threading; 

namespace Sample.Activities 
{ 
    /// <summary> 
    /// Class Non-Blocking Native Activity 
    /// </summary> 
    public abstract class NonblockingNativeActivity : NativeActivity 
    { 
     private Variable<NoPersistHandle> NoPersistHandle { get; set; } 
     private Variable<Bookmark> Bookmark { get; set; } 

     private Task m_Task; 
     private Bookmark m_Bookmark; 
     private BookmarkResumptionHelper m_BookmarkResumptionHelper; 

     /// <summary> 
     /// Allows the activity to induce idle. 
     /// </summary> 
     protected override bool CanInduceIdle 
     { 
      get 
      { 
       return true; 
      } 
     } 

     /// <summary> 
     /// Prepars for Execution 
     /// </summary> 
     /// <param name="context"></param> 
     protected virtual void PrepareToExecute(
      NativeActivityContext context) 
     { 
     } 

     /// <summary> 
     /// Executes a Non-blocking Activity 
     /// </summary> 
     protected abstract void ExecuteNonblocking(); 

     /// <summary> 
     /// After Execution Completes 
     /// </summary> 
     /// <param name="context"></param> 
     protected virtual void AfterExecute(
      NativeActivityContext context) 
     { 
     } 

     /// <summary> 
     /// Executes the Activity 
     /// </summary> 
     /// <param name="context"></param> 
     protected override void Execute(NativeActivityContext context) 
     { 

      // 
      // We must enter a NoPersist zone because it looks like we're idle while our 
      // Task is executing but, we aren't really 
      // 
      NoPersistHandle noPersistHandle = NoPersistHandle.Get(context); 
      noPersistHandle.Enter(context); 

      // 
      // Set a bookmark that we will resume when our Task is done 
      // 
      m_Bookmark = context.CreateBookmark(BookmarkResumptionCallback); 
      this.Bookmark.Set(context, m_Bookmark); 
      m_BookmarkResumptionHelper = context.GetExtension<BookmarkResumptionHelper>(); 

      // 
      // Prepare to execute 
      // 
      PrepareToExecute(context); 

      // 
      // Start a Task to do the actual execution of our activity 
      // 
      CancellationTokenSource tokenSource = new CancellationTokenSource(); 
      m_Task = Task.Factory.StartNew(ExecuteNonblocking, tokenSource.Token); 
      m_Task.ContinueWith(TaskCompletionCallback); 
     } 

     private void TaskCompletionCallback(Task task) 
     { 
      if (!task.IsCompleted) 
      { 
       task.Wait(); 
      } 

      // 
      // Resume the bookmark 
      // 
      m_BookmarkResumptionHelper.ResumeBookmark(m_Bookmark, null); 
     } 


     private void BookmarkResumptionCallback(NativeActivityContext context, Bookmark bookmark, object value) 
     { 
      var noPersistHandle = NoPersistHandle.Get(context); 

      if (m_Task.IsFaulted) 
      { 
       // 
       // The task had a problem 
       // 
       Console.WriteLine("Exception from ExecuteNonBlocking task:"); 
       Exception ex = m_Task.Exception; 
       while (ex != null) 
       { 
        Console.WriteLine(ex.Message); 
        ex = ex.InnerException; 
       } 

       // 
       // If there was an exception exit the no persist handle and rethrow. 
       // 
       if (m_Task.Exception != null) 
       { 
        noPersistHandle.Exit(context); 
        throw m_Task.Exception; 
       } 
      } 

      AfterExecute(context); 

      noPersistHandle.Exit(context); 
     } 

     // 
     // TODO: How do we want to handle cancelations? We can pass a CancellationToekn to the task 
     // so that we cancel the task but, maybe we can do better than that? 
     // 
     /// <summary> 
     /// Abort Activity 
     /// </summary> 
     /// <param name="context"></param> 
     protected override void Abort(NativeActivityAbortContext context) 
     { 
      base.Abort(context); 
     } 

     /// <summary> 
     /// Cancels the Activity 
     /// </summary> 
     /// <param name="context"></param> 
     protected override void Cancel(NativeActivityContext context) 
     { 
      base.Cancel(context); 
     } 

     /// <summary> 
     /// Registers Activity Metadata 
     /// </summary> 
     /// <param name="metadata"></param> 
     protected override void CacheMetadata(NativeActivityMetadata metadata) 
     { 
      base.CacheMetadata(metadata); 
      this.NoPersistHandle = new Variable<NoPersistHandle>(); 
      this.Bookmark = new Variable<Bookmark>(); 
      metadata.AddImplementationVariable(this.NoPersistHandle); 
      metadata.AddImplementationVariable(this.Bookmark); 
      metadata.RequireExtension<BookmarkResumptionHelper>(); 
      metadata.AddDefaultExtensionProvider<BookmarkResumptionHelper>(() => new BookmarkResumptionHelper()); 
     } 
    } 
}