2017-01-13 5 views
6

Я создал конвейер потока данных TPL, который состоит из 3 TransformBlock и ActionBlock в конце.TPL Dataflow - параллельная и асинхронная обработка, сохраняя порядок

var loadXml = new TransformBlock<Job, Job>(job => { ... }); // I/O 
var validateData = new TransformBlock<Job, Job>(job => { ... }); // Parsing&Validating&Calculations 
var importJob = new TransformBlock<Job, Job>(job => { ... }); // Saving to database 

var loadingFailed = new ActionBlock<Job>(job => CreateResponse(job)); 
var validationFailed = new ActionBlock<Job>(job => CreateResponse(job)); 
var reportImport = new ActionBlock<Job>(job => CreateResponse(job)); 

loadXml.LinkTo(validateData, job => job.ReturnCode == 100); 
loadXml.LinkTo(loadingFailed); 

validateData.LinkTo(importJob, Job => Job.ReturnCode == 100); 
validateData.LinkTo(validationFailed); 

importJob.LinkTo(reportImport); 

Каждый блок заполнит работы-объект с обработанными данными, так как я не нужно только сами данные, но и общую информацию, что мне нужно для ответных сообщений. Я довольно много добавляю путь к XML и получаю объект Response с информацией, если все пойдет правильно.

Как я могу достичь, чтобы, если приходят два или более файлов, которые требуют времени для чтения с жесткого диска, он считывает оба файла параллельно и асинхронно, сохраняя приказ, в котором они вошли? Если file1 занимает гораздо больше времени, файл 2 должен будет дождаться завершения файла1, прежде чем передать данные на следующий блок, а затем он также начнет проверку данных параллельно и асинхронно, но и здесь сохранит порядок для следующего блока ?

Прямо сейчас, даже если я назову SendAsync на головной блок, он будет последовательно обрабатывать все файлы.

EDIT: Итак, я написал небольшой тестовый класс для своей цели трубопровода. Он имеет 3 этапа. То, что я хочу достичь, - это первый TransformBlock, который позволяет читать в файлах по мере их поступления (SendAsync из FileSystemWatcher) и выводить их, когда они делаются, чтобы они вошли. Значит, если File1 - это большой файл, а File2 + 3 входит, оба будет читаться, а File1 все еще обрабатывается, но File2 + 3 придется ждать, пока он не сможет отправить второй TransformBlock, потому что File1 все еще читается. Stage2 должен работать точно так же. Stage3, с другой стороны, должен принимать объекты, сгенерированные из File1, и сохранять в базу данных, которые могут выполняться параллельно и асинхронно. Тем не менее, объекты из файла1 должны быть обработаны до файлов file2 и file3. Таким образом, содержимое файла в целом необходимо обрабатывать последовательно, чтобы они вошли. Я попытался сделать это, ограничив третий TransformBlock MaxDegreeOfParallelism и BoundedCapacity, оба установлены в 1, но это, похоже, не работает и не поддерживает порядок в консоли .WriteLine в

using System; 
using System.Collections.Generic; 
using System.Threading; 
using System.Threading.Tasks; 
using System.Threading.Tasks.Dataflow; 
using System.Xml; 
using System.Linq; 

namespace OrderProcessing 
{ 
    public class Job 
    { 
     public string Path { get; set; } 

     public XmlDocument Document { get; set; } 

     public List<Object> BusinessObjects { get; set; } 

     public int ReturnCode { get; set; } 

     public int ID { get; set; } 
    } 

    public class Test 
    { 
     ITargetBlock<Job> pathBlock = null; 

     CancellationTokenSource cancellationTokenSource; 

     Random rnd = new Random(); 

     private bool ReadDocument(Job job) 
     { 
      Console.WriteLine($"ReadDocument {DateTime.Now.TimeOfDay} | Thread {Thread.CurrentThread.ManagedThreadId} is processing Job Id: {job.ID}"); 
      Task.Delay(rnd.Next(1000, 3000)).Wait(); 

      // Throw OperationCanceledException if cancellation is requested. 
      cancellationTokenSource.Token.ThrowIfCancellationRequested(); 

      // Read the document 
      job.Document = new XmlDocument(); 

      // Some checking 
      return true; 
     } 

     private bool ValidateXml(Job job) 
     { 
      Console.WriteLine($"ValidateXml {DateTime.Now.TimeOfDay} | Thread {Thread.CurrentThread.ManagedThreadId} is processing Job Id: {job.ID}"); 
      Task.Delay(rnd.Next(1000, 3000)).Wait(); 

      // Throw OperationCanceledException if cancellation is requested. 
      cancellationTokenSource.Token.ThrowIfCancellationRequested(); 

      // Check XML against XSD and perform remaining checks 
      job.BusinessObjects = new List<object>(); 

      // Just for tests 
      job.BusinessObjects.Add(new object()); 
      job.BusinessObjects.Add(new object()); 

      // Parse Xml and create business objects 
      return true; 
     } 

     private bool ProcessJob(Job job) 
     { 
      Console.WriteLine($"ProcessJob {DateTime.Now.TimeOfDay} | Thread {Thread.CurrentThread.ManagedThreadId} is processing Job Id: {job.ID}"); 

      // Throw OperationCanceledException if cancellation is requested. 
      cancellationTokenSource.Token.ThrowIfCancellationRequested(); 

      Parallel.ForEach(job.BusinessObjects, bO => 
      { 
       ImportObject(bO); 
      }); 


      // Import the job 
      return true; 
     } 

     private object ImportObject(object o) 
     { 
      Task.Delay(rnd.Next(1000, 3000)).Wait(); 

      return new object(); 
     } 

     private void CreateResponse(Job job) 
     { 
      if(job.ReturnCode == 100) 
      { 
       Console.WriteLine("ID {0} was successfully imported.", job.ID); 

      } 
      else 
      { 
       Console.WriteLine("ID {0} failed to import.", job.ID); 
      } 

      // Create response XML with returncodes 
     } 

     ITargetBlock<Job> CreateJobProcessingPipeline() 
     { 
      var loadXml = new TransformBlock<Job, Job>(job => 
      { 
       try 
       { 
        if(ReadDocument(job)) 
        { 
         // For later error handling 
         job.ReturnCode = 100; // success 
        } 
        else 
        { 
         job.ReturnCode = 200; 
        } 

        return job; 
       } 
       catch(OperationCanceledException) 
       { 
        job.ReturnCode = 300; 
        return job; 
       } 
      }, TransformBlockOptions()); 

      var validateXml = new TransformBlock<Job, Job>(job => 
      { 
       try 
       { 
        if(ValidateXml(job)) 
        { 
         // For later error handling 
         job.ReturnCode = 100; 
        } 
        else 
        { 
         job.ReturnCode = 200; 
        } 

        return job; 
       } 
       catch(OperationCanceledException) 
       { 
        job.ReturnCode = 300; 
        return job; 
       } 
      }, TransformBlockOptions()); 


      var importJob = new TransformBlock<Job, Job>(job => 
      { 
       try 
       { 
        if(ProcessJob(job)) 
        { 
         // For later error handling 
         job.ReturnCode = 100; // success 
        } 
        else 
        { 
         job.ReturnCode = 200; 
        } 

        return job; 
       } 
       catch(OperationCanceledException) 
       { 
        job.ReturnCode = 300; 
        return job; 
       } 
      }, ActionBlockOptions()); 

      var loadingFailed = new ActionBlock<Job>(job => CreateResponse(job), ActionBlockOptions()); 
      var validationFailed = new ActionBlock<Job>(job => CreateResponse(job), ActionBlockOptions()); 
      var reportImport = new ActionBlock<Job>(job => CreateResponse(job), ActionBlockOptions()); 

      // 
      // Connect the pipeline 
      // 
      loadXml.LinkTo(validateXml, job => job.ReturnCode == 100); 
      loadXml.LinkTo(loadingFailed); 

      validateXml.LinkTo(importJob, Job => Job.ReturnCode == 100); 
      validateXml.LinkTo(validationFailed); 

      importJob.LinkTo(reportImport); 

      // Return the head of the network. 
      return loadXml; 
     } 

     public void Start() 
     { 
      cancellationTokenSource = new CancellationTokenSource(); 

      pathBlock = CreateJobProcessingPipeline(); 
     } 

     public async void AddJob(string path, int id) 
     { 
      Job j = new Job(); 
      j.Path = path; 
      j.ID = id; 

      await pathBlock.SendAsync(j); 
     } 

     static ExecutionDataflowBlockOptions TransformBlockOptions() 
     { 
      return new ExecutionDataflowBlockOptions 
      { 
       MaxDegreeOfParallelism = 8, 
       BoundedCapacity = 32 
      }; 
     } 

     private static ExecutionDataflowBlockOptions ActionBlockOptions() 
     { 
      return new ExecutionDataflowBlockOptions 
      { 
       MaxDegreeOfParallelism = 1, 
       BoundedCapacity = 1 
      }; 
     } 

     public void Cancel() 
     { 
      if(cancellationTokenSource != null) 
       cancellationTokenSource.Cancel(); 
     } 
    } 

    class Program 
    { 
     private static String InputXml = @"C:\XML\Part.xml"; 
     private static Test _Pipeline; 

     static void Main(string[] args) 
     { 
      _Pipeline = new Test(); 
      _Pipeline.Start(); 


      var data = Enumerable.Range(1, 100); 

      foreach(var d in data) 
       _Pipeline.AddJob(InputXml, d); 

      //Wait before closing the application so we can see the results. 
      Console.ReadLine(); 
     } 
    } 
} 

EDIT2: После того как я сделал одно изменение, установив BoundedCapacity в неограниченном, я получил все в порядке, он был отправлен в трубу. Так что раньше это было не по порядку, но сообщения, которые отбросили, я думаю?

Если я удостоверился, что EnsureOrdered правдиво, а также используя MaxDegreeOfParallelism из 8 в последних TransformBlock, детали не в порядке, если вы проверите часть выхода ниже. Но это то, где он должен быть в порядке, поскольку im сохраняет данные в базе данных, которые должны быть в том порядке, в котором он пришел. Не важно, если это не в порядке, когда он оставляет последний TransformBlock, поэтому я думаю, что я не может держать параллелизм здесь?

ValidateXml 08:27:24.2855461 | Thread 21 is processing Job Id: 36 
ValidateXml 08:27:24.2855461 | Thread 28 is processing Job Id: 37 
+++ ProcessJob 08:27:24.2880490 | Thread 33 is processing Job Id: 9 
ReadDocument 08:27:24.2855461 | Thread 6 is processing Job Id: 56 
ValidateXml 08:27:25.2853094 | Thread 19 is processing Job Id: 38 
ReadDocument 08:27:25.2853094 | Thread 13 is processing Job Id: 58 
+++ ProcessJob 08:27:25.2868091 | Thread 34 is processing Job Id: 13 
ReadDocument 08:27:25.2858087 | Thread 16 is processing Job Id: 59 
+++ ProcessJob 08:27:25.2858087 | Thread 25 is processing Job Id: 10 
+++ ProcessJob 08:27:25.2858087 | Thread 29 is processing Job Id: 12 
ReadDocument 08:27:25.2853094 | Thread 11 is processing Job Id: 57 
ReadDocument 08:27:25.2873097 | Thread 15 is processing Job Id: 60 
ValidateXml 08:27:25.2853094 | Thread 22 is processing Job Id: 40 
ValidateXml 08:27:25.2853094 | Thread 23 is processing Job Id: 39 
+++ ProcessJob 08:27:25.2858087 | Thread 30 is processing Job Id: 11 
ValidateXml 08:27:26.2865381 | Thread 21 is processing Job Id: 41 
ReadDocument 08:27:26.2865381 | Thread 14 is processing Job Id: 61 
ValidateXml 08:27:26.2865381 | Thread 20 is processing Job Id: 42 
ValidateXml 08:27:26.2865381 | Thread 26 is processing Job Id: 43 
ReadDocument 08:27:26.2865381 | Thread 17 is processing Job Id: 62 
ReadDocument 08:27:26.2870374 | Thread 12 is processing Job Id: 63 
+++ ProcessJob 08:27:26.2870374 | Thread 24 is processing Job Id: 14 

EDIT3: Выход после использования @JSteward последнего кода.

ReadDocument 09:01:03.9363340 JobId: 1 
ReadDocument 09:01:03.9368357 JobId: 5 
ReadDocument 09:01:03.9373347 JobId: 6 
ReadDocument 09:01:03.9368357 JobId: 8 
ReadDocument 09:01:03.9363340 JobId: 4 
ReadDocument 09:01:03.9373347 JobId: 3 
ReadDocument 09:01:03.9373347 JobId: 7 
ReadDocument 09:01:03.9368357 JobId: 2 
ReadDocument 09:01:05.2037570 JobId: 9 
ReadDocument 09:01:05.3108413 JobId: 10 
ReadDocument 09:01:05.5678177 JobId: 11 
ReadDocument 09:01:05.6308763 JobId: 12 
ValidateXml 09:01:05.6338782 JobId: 1 
ValidateXml 09:01:06.3754174 JobId: 2 
ReadDocument 09:01:06.3764184 JobId: 13 
ReadDocument 09:01:06.3764184 JobId: 14 
ReadDocument 09:01:07.3756634 JobId: 15 
ReadDocument 09:01:07.3756634 JobId: 18 
ValidateXml 09:01:07.3756634 JobId: 3 
ValidateXml 09:01:07.3756634 JobId: 4 
ReadDocument 09:01:07.3756634 JobId: 17 
ReadDocument 09:01:07.3756634 JobId: 16 
ReadDocument 09:01:08.3753887 JobId: 19 
ReadDocument 09:01:08.3753887 JobId: 20 
ValidateXml 09:01:08.3753887 JobId: 5 
ProcessJob 09:01:08.3763906 JobId: 1 
ReadDocument 09:01:09.3744411 JobId: 21 
ReadDocument 09:01:09.3749410 JobId: 24 
ProcessJob 09:01:09.3749410 JobId: 2 
ReadDocument 09:01:09.3749410 JobId: 22 
ReadDocument 09:01:09.3749410 JobId: 23 
ReadDocument 09:01:10.3752061 JobId: 25 
ReadDocument 09:01:10.3752061 JobId: 27 
ValidateXml 09:01:10.3752061 JobId: 6 
ValidateXml 09:01:10.3752061 JobId: 7 
ValidateXml 09:01:10.3752061 JobId: 8 
ReadDocument 09:01:10.3752061 JobId: 26 
ReadDocument 09:01:11.3759294 JobId: 29 
ReadDocument 09:01:11.3759294 JobId: 28 
ValidateXml 09:01:11.3764278 JobId: 10 
ReadDocument 09:01:11.3759294 JobId: 31 
ValidateXml 09:01:11.3759294 JobId: 9 
ReadDocument 09:01:11.3759294 JobId: 30 
ValidateXml 09:01:12.3751553 JobId: 11 
ReadDocument 09:01:12.3751553 JobId: 33 
ValidateXml 09:01:12.3751553 JobId: 12 
ReadDocument 09:01:12.3751553 JobId: 34 
ReadDocument 09:01:12.3751553 JobId: 32 
ValidateXml 09:01:13.3753842 JobId: 13 
ValidateXml 09:01:13.3753842 JobId: 14 
ValidateXml 09:01:13.3753842 JobId: 16 
ReadDocument 09:01:13.3753842 JobId: 35 
ReadDocument 09:01:13.3753842 JobId: 36 
ValidateXml 09:01:13.3753842 JobId: 15 
ReadDocument 09:01:14.3756414 JobId: 37 
ValidateXml 09:01:14.3756414 JobId: 19 
ValidateXml 09:01:14.3756414 JobId: 18 
ValidateXml 09:01:14.3756414 JobId: 17 
ReadDocument 09:01:14.3756414 JobId: 40 
ReadDocument 09:01:14.3756414 JobId: 38 
ReadDocument 09:01:14.3756414 JobId: 39 
ProcessJob 09:01:14.3761419 JobId: 3 
SendToDataBase 09:01:14.3806453 JobId: 1 
SendToDataBase 09:01:14.3821472 JobId: 2 
ProcessJob 09:01:14.3821472 JobId: 4 
ValidateXml 09:01:15.3763758 JobId: 20 
ReadDocument 09:01:15.3763758 JobId: 42 
ValidateXml 09:01:15.3763758 JobId: 21 
ReadDocument 09:01:15.3773772 JobId: 43 
ReadDocument 09:01:15.3763758 JobId: 41 
ValidateXml 09:01:15.3768800 JobId: 22 
ReadDocument 09:01:15.3773772 JobId: 44 
ValidateXml 09:01:16.3761117 JobId: 23 
ValidateXml 09:01:16.3761117 JobId: 26 
ValidateXml 09:01:16.3761117 JobId: 24 
ValidateXml 09:01:16.3761117 JobId: 25 
ReadDocument 09:01:16.3761117 JobId: 45 
ReadDocument 09:01:16.3761117 JobId: 46 
ProcessJob 09:01:16.3761117 JobId: 5 
ReadDocument 09:01:17.3758334 JobId: 47 
ValidateXml 09:01:17.3763315 JobId: 28 
ValidateXml 09:01:17.3763315 JobId: 27 
ReadDocument 09:01:17.3763315 JobId: 49 
ReadDocument 09:01:17.3763315 JobId: 48 
ProcessJob 09:01:17.3763315 JobId: 6 
ValidateXml 09:01:17.3763315 JobId: 29 
ReadDocument 09:01:17.3763315 JobId: 50 
ReadDocument 09:01:18.3755786 JobId: 51 
ReadDocument 09:01:18.3755786 JobId: 52 
<<< 
ProcessJob 09:01:18.3770792 JobId: 10 
ProcessJob 09:01:18.3770792 JobId: 9 
ProcessJob 09:01:18.3755786 JobId: 7 
>>> 
ReadDocument 09:01:18.3755786 JobId: 53 
ValidateXml 09:01:18.3755786 JobId: 32 
ValidateXml 09:01:18.3755786 JobId: 31 
ValidateXml 09:01:18.3755786 JobId: 30 
ReadDocument 09:01:18.3760794 JobId: 54 
ProcessJob 09:01:18.3755786 JobId: 8 
ValidateXml 09:01:19.3753274 JobId: 34 
ValidateXml 09:01:19.3753274 JobId: 33 
ReadDocument 09:01:19.3758261 JobId: 56 
ReadDocument 09:01:19.3758261 JobId: 55 
ValidateXml 09:01:19.3758261 JobId: 35 
ValidateXml 09:01:20.3752782 JobId: 36 
ValidateXml 09:01:20.3752782 JobId: 37 
ProcessJob 09:01:20.3757709 JobId: 11 
ReadDocument 09:01:20.3752782 JobId: 57 
ValidateXml 09:01:20.3752782 JobId: 38 
ReadDocument 09:01:20.3757709 JobId: 58 
ReadDocument 09:01:20.3757709 JobId: 59 
ProcessJob 09:01:21.3757202 JobId: 12 
ValidateXml 09:01:21.3757202 JobId: 39 
ReadDocument 09:01:21.3757202 JobId: 62 
ReadDocument 09:01:21.3757202 JobId: 61 
ReadDocument 09:01:21.3757202 JobId: 60 
ReadDocument 09:01:22.3764154 JobId: 63 
ReadDocument 09:01:22.3764154 JobId: 64 
ReadDocument 09:01:22.3764154 JobId: 65 
ProcessJob 09:01:22.3794167 JobId: 16 
ValidateXml 09:01:22.3764154 JobId: 40 
ValidateXml 09:01:22.3764154 JobId: 42 
ReadDocument 09:01:22.3764154 JobId: 66 
ValidateXml 09:01:22.3774149 JobId: 43 
ProcessJob 09:01:22.3764154 JobId: 13 
ValidateXml 09:01:22.3764154 JobId: 41 
ProcessJob 09:01:22.3779160 JobId: 15 
SendToDataBase 09:01:22.3784159 JobId: 3 
ProcessJob 09:01:22.3764154 JobId: 14 
ValidateXml 09:01:22.3859209 JobId: 44 
SendToDataBase 09:01:22.4309993 JobId: 4 
SendToDataBase 09:01:22.4460051 JobId: 5 
SendToDataBase 09:01:22.4465047 JobId: 6 
ReadDocument 09:01:23.3760112 JobId: 67 
ValidateXml 09:01:23.3760112 JobId: 46 
ValidateXml 09:01:23.3760112 JobId: 47 
ReadDocument 09:01:23.3760112 JobId: 68 
ValidateXml 09:01:23.3760112 JobId: 45 
ProcessJob 09:01:23.3760112 JobId: 17 
ValidateXml 09:01:24.3762581 JobId: 48 
ReadDocument 09:01:24.3762581 JobId: 69 
ProcessJob 09:01:24.3762581 JobId: 18 
ProcessJob 09:01:24.3762581 JobId: 19 
ReadDocument 09:01:24.3762581 JobId: 70 
CreateResponse 09:01:24.3777606 JobId: 58 
CreateResponse 09:01:24.3994684 JobId: 59 
CreateResponse 09:01:24.4059908 JobId: 60 
CreateResponse 09:01:24.4114777 JobId: 61 
CreateResponse 09:01:24.4134789 JobId: 62 
ValidateXml 09:01:25.3759607 JobId: 49 
ValidateXml 09:01:25.3759607 JobId: 51 
ProcessJob 09:01:25.3784627 JobId: 22 
ValidateXml 09:01:25.3759607 JobId: 52 
ProcessJob 09:01:25.3759607 JobId: 20 
ValidateXml 09:01:25.3774629 JobId: 53 
ValidateXml 09:01:25.3759607 JobId: 50 
ValidateXml 09:01:25.3774629 JobId: 54 
ReadDocument 09:01:25.3759607 JobId: 72 
ReadDocument 09:01:25.3774629 JobId: 73 
ReadDocument 09:01:25.3759607 JobId: 71 
ReadDocument 09:01:25.3779625 JobId: 74 
ProcessJob 09:01:25.3759607 JobId: 21 
SendToDataBase 09:01:25.3774629 JobId: 7 
CreateResponse 09:01:25.3759607 JobId: 39 
SendToDataBase 09:01:25.4398495 JobId: 8 
SendToDataBase 09:01:25.4448555 JobId: 9 
SendToDataBase 09:01:25.4478565 JobId: 10 
SendToDataBase 09:01:25.4483570 JobId: 11 
CreateResponse 09:01:25.4448555 JobId: 42 
CreateResponse 09:01:25.4608868 JobId: 43 
SendToDataBase 09:01:25.4553682 JobId: 12 
CreateResponse 09:01:25.4613665 JobId: 44 
CreateResponse 09:01:25.4698849 JobId: 45 
ReadDocument 09:01:26.3754874 JobId: 75 
ReadDocument 09:01:26.3754874 JobId: 76 
ReadDocument 09:01:26.3754874 JobId: 78 
ValidateXml 09:01:26.3754874 JobId: 55 
ProcessJob 09:01:26.3759876 JobId: 24 
ProcessJob 09:01:26.3754874 JobId: 23 
ReadDocument 09:01:26.3754874 JobId: 77 
SendToDataBase 09:01:26.3759876 JobId: 13 
SendToDataBase 09:01:26.3980055 JobId: 14 
SendToDataBase 09:01:26.3985045 JobId: 15 
SendToDataBase 09:01:26.4020099 JobId: 16 
ReadDocument 09:01:27.3762164 JobId: 79 
ValidateXml 09:01:27.3762164 JobId: 56 
ProcessJob 09:01:27.3762164 JobId: 26 
ReadDocument 09:01:27.3762164 JobId: 82 
ProcessJob 09:01:27.3762164 JobId: 25 
ReadDocument 09:01:27.3762164 JobId: 81 
ReadDocument 09:01:27.3762164 JobId: 80 
ValidateXml 09:01:27.3762164 JobId: 63 
ValidateXml 09:01:27.3777165 JobId: 64 
ProcessJob 09:01:27.3767157 JobId: 27 
ValidateXml 09:01:27.3762164 JobId: 57 
SendToDataBase 09:01:27.3777165 JobId: 17 
SendToDataBase 09:01:27.4327571 JobId: 18 
SendToDataBase 09:01:27.4357587 JobId: 19 
ReadDocument 09:01:28.3761410 JobId: 83 
ProcessJob 09:01:28.3761410 JobId: 28 
ProcessJob 09:01:28.3761410 JobId: 29 
ValidateXml 09:01:28.3761410 JobId: 66 
SendToDataBase 09:01:28.3761410 JobId: 20 
ProcessJob 09:01:28.3761410 JobId: 30 
ValidateXml 09:01:28.3761410 JobId: 67 
ValidateXml 09:01:28.3761410 JobId: 65 
SendToDataBase 09:01:28.3861483 JobId: 21 
SendToDataBase 09:01:28.4141687 JobId: 22 
ReadDocument 09:01:28.6079764 JobId: 84 
ReadDocument 09:01:28.6552491 JobId: 85 
ReadDocument 09:01:28.7047606 JobId: 86 
ValidateXml 09:01:28.7327861 JobId: 68 
ProcessJob 09:01:28.7327861 JobId: 31 
ReadDocument 09:01:29.1285484 JobId: 87 
ProcessJob 09:01:29.1894672 JobId: 32 
SendToDataBase 09:01:29.1894672 JobId: 23 
SendToDataBase 09:01:29.1944706 JobId: 24 
ReadDocument 09:01:29.3910070 JobId: 88 
ValidateXml 09:01:29.5569691 JobId: 69 
ReadDocument 09:01:29.5995036 JobId: 89 
ValidateXml 09:01:29.6085095 JobId: 70 
ReadDocument 09:01:29.6581266 JobId: 90 
ValidateXml 09:01:29.8797899 JobId: 71 
ValidateXml 09:01:30.1244519 JobId: 72 
ValidateXml 09:01:30.1584763 JobId: 73 
ReadDocument 09:01:30.2100312 JobId: 91 
ProcessJob 09:01:30.2490536 JobId: 33 
ProcessJob 09:01:30.2950865 JobId: 34 
ReadDocument 09:01:30.3290995 JobId: 92 
ProcessJob 09:01:30.3636350 JobId: 35 
SendToDataBase 09:01:30.3636350 JobId: 25 
SendToDataBase 09:01:30.3701300 JobId: 26 
SendToDataBase 09:01:30.3706299 JobId: 27 
ProcessJob 09:01:30.4987430 JobId: 36 
ReadDocument 09:01:30.5642707 JobId: 93 
ReadDocument 09:01:30.6088035 JobId: 94 
ValidateXml 09:01:30.7213868 JobId: 74 
ReadDocument 09:01:30.7544106 JobId: 95 
ProcessJob 09:01:30.7544106 JobId: 37 
SendToDataBase 09:01:30.7544106 JobId: 28 
ProcessJob 09:01:31.1091681 JobId: 38 
SendToDataBase 09:01:31.1091681 JobId: 29 
SendToDataBase 09:01:31.1151730 JobId: 30 
ValidateXml 09:01:31.2012468 JobId: 75 
ValidateXml 09:01:31.2827940 JobId: 76 
ValidateXml 09:01:31.3143168 JobId: 77 
ValidateXml 09:01:31.4073842 JobId: 78 
ReadDocument 09:01:31.4369059 JobId: 96 
ReadDocument 09:01:31.4699302 JobId: 97 
ProcessJob 09:01:31.7201123 JobId: 40 
SendToDataBase 09:01:31.7201123 JobId: 31 
ProcessJob 09:01:32.1569310 JobId: 41 
SendToDataBase 09:01:32.1569310 JobId: 32 
ValidateXml 09:01:32.3650822 JobId: 79 
ValidateXml 09:01:32.3650822 JobId: 80 
ProcessJob 09:01:32.3966047 JobId: 46 
ReadDocument 09:01:32.4236247 JobId: 98 
ReadDocument 09:01:32.4831869 JobId: 99 
ValidateXml 09:01:32.5607342 JobId: 81 
ReadDocument 09:01:32.5777363 JobId: 100 
ProcessJob 09:01:33.1461630 JobId: 47 
ProcessJob 09:01:33.2081967 JobId: 48 
SendToDataBase 09:01:33.2081967 JobId: 33 
SendToDataBase 09:01:33.2137015 JobId: 34 
SendToDataBase 09:01:33.2172021 JobId: 35 
ValidateXml 09:01:33.2347146 JobId: 82 
ValidateXml 09:01:33.4228519 JobId: 83 
ProcessJob 09:01:33.4228519 JobId: 49 
ValidateXml 09:01:33.4373638 JobId: 84 
ProcessJob 09:01:33.4878995 JobId: 50 
SendToDataBase 09:01:33.4878995 JobId: 36 
ProcessJob 09:01:33.5819674 JobId: 51 
ValidateXml 09:01:33.6239992 JobId: 85 
ProcessJob 09:01:33.6239992 JobId: 52 
SendToDataBase 09:01:33.6239992 JobId: 37 
SendToDataBase 09:01:33.6295082 JobId: 38 
ValidateXml 09:01:33.6870563 JobId: 86 
ValidateXml 09:01:33.7125626 JobId: 87 
ProcessJob 09:01:34.1238635 JobId: 53 
ProcessJob 09:01:34.5796949 JobId: 54 
<<< 
SendToDataBase 09:01:34.5796949 JobId: 40 
SendToDataBase 09:01:34.5856995 JobId: 41 
SendToDataBase 09:01:34.5887008 JobId: 46 
>>> 
ValidateXml 09:01:34.7951688 JobId: 88 
ValidateXml 09:01:34.9162007 JobId: 89 
ProcessJob 09:01:34.9541705 JobId: 55 
ValidateXml 09:01:35.0464443 JobId: 90 
ProcessJob 09:01:35.3634898 JobId: 56 
ProcessJob 09:01:35.3795024 JobId: 57 
ValidateXml 09:01:35.5165095 JobId: 91 
ValidateXml 09:01:35.8614345 JobId: 92 
ProcessJob 09:01:35.9985415 JobId: 63 
ValidateXml 09:01:36.0481807 JobId: 93 
ProcessJob 09:01:36.0763064 JobId: 64 
ProcessJob 09:01:36.0993229 JobId: 65 
SendToDataBase 09:01:36.0993229 JobId: 47 
SendToDataBase 09:01:36.1048270 JobId: 48 
ValidateXml 09:01:36.1572079 JobId: 94 
ValidateXml 09:01:36.3791015 JobId: 95 
ProcessJob 09:01:36.4212607 JobId: 66 
SendToDataBase 09:01:36.4212607 JobId: 49 
SendToDataBase 09:01:36.4267655 JobId: 50 
SendToDataBase 09:01:36.4272654 JobId: 51 
SendToDataBase 09:01:36.4322913 JobId: 52 
SendToDataBase 09:01:36.4327837 JobId: 53 
ProcessJob 09:01:36.5149796 JobId: 67 
SendToDataBase 09:01:36.5149796 JobId: 54 
ValidateXml 09:01:36.6861048 JobId: 96 
ValidateXml 09:01:36.7845716 JobId: 97 
ValidateXml 09:01:37.0175979 JobId: 98 
ValidateXml 09:01:37.3788835 JobId: 99 
ValidateXml 09:01:37.6477046 JobId: 100 
ProcessJob 09:01:37.8269808 JobId: 68 
SendToDataBase 09:01:37.8269808 JobId: 55 
ProcessJob 09:01:37.8940108 JobId: 69 
ProcessJob 09:01:38.2955556 JobId: 70 
ProcessJob 09:01:38.3110583 JobId: 71 
SendToDataBase 09:01:38.3110583 JobId: 56 
SendToDataBase 09:01:38.3125586 JobId: 57 
CreateResponse 09:01:38.4551538 JobId: 95 
CreateResponse 09:01:38.4925304 JobId: 96 
ProcessJob 09:01:38.5382532 JobId: 72 
ProcessJob 09:01:38.9129894 JobId: 73 
SendToDataBase 09:01:38.9129894 JobId: 63 
SendToDataBase 09:01:38.9185062 JobId: 64 
SendToDataBase 09:01:38.9189949 JobId: 65 
ProcessJob 09:01:38.9852121 JobId: 74 
ProcessJob 09:01:39.0317458 JobId: 75 
SendToDataBase 09:01:39.0317458 JobId: 66 
SendToDataBase 09:01:39.0377511 JobId: 67 
ProcessJob 09:01:39.6129381 JobId: 76 
SendToDataBase 09:01:39.6129381 JobId: 68 
ProcessJob 09:01:39.7833004 JobId: 77 
SendToDataBase 09:01:39.7833004 JobId: 69 
ProcessJob 09:01:39.8740443 JobId: 78 
ProcessJob 09:01:40.3145731 JobId: 79 
SendToDataBase 09:01:40.3145731 JobId: 70 
SendToDataBase 09:01:40.3205708 JobId: 71 
ProcessJob 09:01:40.4912084 JobId: 80 
ProcessJob 09:01:40.5307205 JobId: 81 
SendToDataBase 09:01:40.5317212 JobId: 72 
ProcessJob 09:01:40.5652454 JobId: 82 
ProcessJob 09:01:41.2902736 JobId: 83 
ProcessJob 09:01:41.2902736 JobId: 84 
ProcessJob 09:01:41.3598244 JobId: 85 
SendToDataBase 09:01:41.3598244 JobId: 73 
SendToDataBase 09:01:41.3663284 JobId: 74 
SendToDataBase 09:01:41.3713317 JobId: 75 
SendToDataBase 09:01:41.3718392 JobId: 76 
SendToDataBase 09:01:41.3723328 JobId: 77 
ProcessJob 09:01:42.2677493 JobId: 86 
SendToDataBase 09:01:42.2677493 JobId: 78 
ProcessJob 09:01:42.6466081 JobId: 87 
ProcessJob 09:01:42.8947969 JobId: 88 
SendToDataBase 09:01:42.8947969 JobId: 79 
ProcessJob 09:01:43.0012509 JobId: 89 
ProcessJob 09:01:43.1513589 JobId: 90 
ProcessJob 09:01:43.4545800 JobId: 91 
SendToDataBase 09:01:43.4545800 JobId: 80 
SendToDataBase 09:01:43.4600832 JobId: 81 
SendToDataBase 09:01:43.4605919 JobId: 82 
ProcessJob 09:01:43.5946813 JobId: 92 
ProcessJob 09:01:44.1731027 JobId: 93 
SendToDataBase 09:01:44.1731027 JobId: 83 
SendToDataBase 09:01:44.1786068 JobId: 84 
SendToDataBase 09:01:44.1816090 JobId: 85 
ProcessJob 09:01:44.4678171 JobId: 94 
SendToDataBase 09:01:44.4678171 JobId: 86 
ProcessJob 09:01:45.3426043 JobId: 97 
SendToDataBase 09:01:45.3426043 JobId: 87 
ProcessJob 09:01:45.3751270 JobId: 98 
ProcessJob 09:01:45.7363757 JobId: 99 
ProcessJob 09:01:45.7809216 JobId: 100 
SendToDataBase 09:01:45.7809216 JobId: 88 
SendToDataBase 09:01:45.7879270 JobId: 89 
SendToDataBase 09:01:45.7925566 JobId: 90 
SendToDataBase 09:01:45.8776726 JobId: 91 
SendToDataBase 09:01:45.8776726 JobId: 92 
SendToDataBase 09:01:46.5813640 JobId: 93 
SendToDataBase 09:01:46.5813640 JobId: 94 
SendToDataBase 09:01:47.7407165 JobId: 97 
SendToDataBase 09:01:47.7407165 JobId: 98 
SendToDataBase 09:01:48.4382058 JobId: 99 
SendToDataBase 09:01:48.7357557 JobId: 100 

ответ

2

Исходного тела ответа стал слишком длинным

Edit4: реакция на OP Edit2 Я не уверен, какие именно изменения были сделанный для получения предоставленного результата, но вот ваш модифицированный источник и результаты, отображающие упорядоченное поведение для всех 100 входов.

using System; 
using System.Collections.Generic; 
using System.Threading; 
using System.Threading.Tasks; 
using System.Threading.Tasks.Dataflow; 
using System.Xml; 
using System.Linq; 

namespace OrderProcessing { 
    public class Job { 
     public string Path { get; set; } 

     public XmlDocument Document { get; set; } 

     public List<Object> BusinessObjects { get; set; } 

     public int ReturnCode { get; set; } 

     public int ID { get; set; } 
    } 

    public class Test { 
     ITargetBlock<Job> pathBlock = null; 

     CancellationTokenSource cancellationTokenSource; 

     Random rnd = new Random(); 

     private bool ReadDocument(Job job) { 
      Console.WriteLine($"ReadDocument {DateTime.Now.TimeOfDay} JobId: {job.ID}"); 
      Task.Delay(rnd.Next(1000, 3000)).Wait(); 

      // Throw OperationCanceledException if cancellation is requested. 
      cancellationTokenSource.Token.ThrowIfCancellationRequested(); 

      // Read the document 
      job.Document = new XmlDocument(); 

      // Some checking 
      return true; 
     } 

     private bool ValidateXml(Job job) { 
      Console.WriteLine($"ValidateXml {DateTime.Now.TimeOfDay} JobId: {job.ID}"); 
      Task.Delay(rnd.Next(1000, 3000)).Wait(); 

      // Throw OperationCanceledException if cancellation is requested. 
      cancellationTokenSource.Token.ThrowIfCancellationRequested(); 

      // Check XML against XSD and perform remaining checks 
      job.BusinessObjects = new List<object>(); 

      // Just for tests 
      job.BusinessObjects.Add(new object()); 
      job.BusinessObjects.Add(new object()); 

      // Parse Xml and create business objects 
      return true; 
     } 

     private bool ProcessJob(Job job) { 
      Console.WriteLine($"ProcessJob {DateTime.Now.TimeOfDay} JobId: {job.ID}"); 

      // Throw OperationCanceledException if cancellation is requested. 
      cancellationTokenSource.Token.ThrowIfCancellationRequested(); 

      Parallel.ForEach(job.BusinessObjects, bO => { 
       ImportObject(bO); 
      }); 


      // Import the job 
      return true; 
     } 

     private object ImportObject(object o) { 
      Task.Delay(rnd.Next(1000, 3000)).Wait(); 

      return new object(); 
     } 

     private void CreateResponse(Job job) { 
      if (job.ReturnCode == 100) { 
       Console.WriteLine($"CreateResponse {DateTime.Now.TimeOfDay} JobId: {job.ID}"); 

      } 
      else { 
       Console.WriteLine("ID {0} failed to import.", job.ID); 
      } 

      // Create response XML with returncodes 
     } 

     ITargetBlock<Job> CreateJobProcessingPipeline() { 
      var loadXml = new TransformBlock<Job, Job>(job => { 
       try { 
        if (ReadDocument(job)) { 
         // For later error handling 
         job.ReturnCode = 100; // success 
        } 
        else { 
         job.ReturnCode = 200; 
        } 

        return job; 
       } 
       catch (OperationCanceledException) { 
        job.ReturnCode = 300; 
        return job; 
       } 
      }, TransformBlockOptions()); 

      var validateXml = new TransformBlock<Job, Job>(job => { 
       try { 
        if (ValidateXml(job)) { 
         // For later error handling 
         job.ReturnCode = 100; 
        } 
        else { 
         job.ReturnCode = 200; 
        } 

        return job; 
       } 
       catch (OperationCanceledException) { 
        job.ReturnCode = 300; 
        return job; 
       } 
      }, TransformBlockOptions()); 


      var importJob = new TransformBlock<Job, Job>(job => { 
       try { 
        if (ProcessJob(job)) { 
         // For later error handling 
         job.ReturnCode = 100; // success 
        } 
        else { 
         job.ReturnCode = 200; 
        } 

        return job; 
       } 
       catch (OperationCanceledException) { 
        job.ReturnCode = 300; 
        return job; 
       } 
      }, TransformBlockOptions()); 

      var loadingFailed = new ActionBlock<Job>(job => CreateResponse(job), ActionBlockOptions()); 
      var validationFailed = new ActionBlock<Job>(job => CreateResponse(job), ActionBlockOptions()); 
      var reportImport = new ActionBlock<Job>(job => CreateResponse(job), ActionBlockOptions()); 

      // 
      // Connect the pipeline 
      // 
      loadXml.LinkTo(validateXml, job => job.ReturnCode == 100); 
      loadXml.LinkTo(loadingFailed); 

      validateXml.LinkTo(importJob, Job => Job.ReturnCode == 100); 
      validateXml.LinkTo(validationFailed); 

      //importJob.LinkTo(reportImport); 

      var output = importJob.AsObservable(); 
      var subscription = output.Subscribe(x => { 
      if (x.ReturnCode == 100) { 
       //job success 
       Console.WriteLine($"SendToDataBase {DateTime.Now.TimeOfDay} JobId: {x.ID}"); 
      } 
      else { 
       //handle fault 
       Console.WriteLine($"Job Failed {DateTime.Now.TimeOfDay} JobId: {x.ID}"); 
      }     
     }); 

      // Return the head of the network. 
      return loadXml; 
     } 

     public void Start() { 
      cancellationTokenSource = new CancellationTokenSource(); 

      pathBlock = CreateJobProcessingPipeline(); 
     } 

     public async void AddJob(string path, int id) { 
      Job j = new Job(); 
      j.Path = path; 
      j.ID = id; 

      await pathBlock.SendAsync(j); 
     } 

     static ExecutionDataflowBlockOptions TransformBlockOptions() { 
      return new ExecutionDataflowBlockOptions { 
       MaxDegreeOfParallelism = 8, 
       BoundedCapacity = 32 
      }; 
     } 

     private static ExecutionDataflowBlockOptions ActionBlockOptions() { 
      return new ExecutionDataflowBlockOptions { 
       MaxDegreeOfParallelism = 1, 
       BoundedCapacity = 1 
      }; 
     } 

     public void Cancel() { 
      if (cancellationTokenSource != null) 
       cancellationTokenSource.Cancel(); 
     } 
    } 

    class Program { 
     private static String InputXml = @"C:\XML\Part.xml"; 
     private static Test _Pipeline; 

     static void Main(string[] args) { 
      _Pipeline = new Test(); 
      _Pipeline.Start(); 


      var data = Enumerable.Range(1, 100); 

      foreach (var d in data) 
       _Pipeline.AddJob(InputXml, d); 

      //Wait before closing the application so we can see the results. 
      Console.ReadLine(); 
     } 
    } 
} 

Результаты

Function,Timestamp,Other,JobId 
ReadDocument,08:11:27.2200011,JobId:,1 
ReadDocument,08:11:27.2240007,JobId:,2 
ReadDocument,08:11:29.7562763,JobId:,3 
ReadDocument,08:11:29.7662792,JobId:,4 
ReadDocument,08:11:30.7013793,JobId:,5 
ReadDocument,08:11:31.7024931,JobId:,6 
ReadDocument,08:11:31.7034925,JobId:,7 
ReadDocument,08:11:32.7306060,JobId:,9 
ReadDocument,08:11:32.7306060,JobId:,8 
ReadDocument,08:11:33.7027033,JobId:,10 
ReadDocument,08:11:33.7027033,JobId:,11 
ReadDocument,08:11:34.7018217,JobId:,12 
ReadDocument,08:11:34.7028153,JobId:,13 
ReadDocument,08:11:35.7019214,JobId:,14 
ReadDocument,08:11:35.7069235,JobId:,15 
ReadDocument,08:11:35.7069235,JobId:,16 
ReadDocument,08:11:35.7069235,JobId:,17 
ReadDocument,08:11:35.7079221,JobId:,18 
ValidateXml,08:11:35.7119363,JobId:,1 
ValidateXml,08:11:36.7060334,JobId:,2 
ReadDocument,08:11:36.7060334,JobId:,19 
ReadDocument,08:11:36.7070332,JobId:,20 
ReadDocument,08:11:37.7071383,JobId:,21 
ReadDocument,08:11:37.7071383,JobId:,22 
ReadDocument,08:11:37.7081392,JobId:,23 
ValidateXml,08:11:37.7091421,JobId:,3 
ReadDocument,08:11:38.7032496,JobId:,24 
ValidateXml,08:11:38.7052496,JobId:,6 
ValidateXml,08:11:38.7042513,JobId:,4 
ReadDocument,08:11:38.7052496,JobId:,27 
ValidateXml,08:11:38.7042513,JobId:,5 
ReadDocument,08:11:38.7052496,JobId:,28 
ReadDocument,08:11:38.7042513,JobId:,26 
ReadDocument,08:11:38.7032496,JobId:,25 
ValidateXml,08:11:39.7023545,JobId:,7 
ReadDocument,08:11:39.7023545,JobId:,29 
ValidateXml,08:11:39.7023545,JobId:,8 
ReadDocument,08:11:40.7064634,JobId:,30 
ReadDocument,08:11:40.7064634,JobId:,31 
ValidateXml,08:11:40.7084642,JobId:,9 
ValidateXml,08:11:41.7045755,JobId:,10 
ReadDocument,08:11:41.7085762,JobId:,33 
ValidateXml,08:11:41.7105750,JobId:,11 
ValidateXml,08:11:41.7115767,JobId:,12 
ValidateXml,08:11:41.7135740,JobId:,13 
ValidateXml,08:11:41.7155790,JobId:,14 
ReadDocument,08:11:41.7085762,JobId:,34 
ReadDocument,08:11:41.7045755,JobId:,32 
ReadDocument,08:11:41.7105750,JobId:,35 
ReadDocument,08:11:41.7135740,JobId:,36 
ReadDocument,08:11:42.7086844,JobId:,37 
ValidateXml,08:11:42.7116926,JobId:,15 
ValidateXml,08:11:42.7126878,JobId:,16 
ReadDocument,08:11:42.7116926,JobId:,38 
ValidateXml,08:11:43.7027911,JobId:,17 
ValidateXml,08:11:43.7027911,JobId:,18 
ValidateXml,08:11:43.7068030,JobId:,20 
ProcessJob,08:11:43.7097908,JobId:,1 
ValidateXml,08:11:43.7057897,JobId:,19 
ReadDocument,08:11:43.7057897,JobId:,39 
ReadDocument,08:11:43.7077893,JobId:,40 
ReadDocument,08:11:44.7038990,JobId:,41 
ProcessJob,08:11:44.7059002,JobId:,2 
ValidateXml,08:11:44.7049004,JobId:,21 
ReadDocument,08:11:44.7038990,JobId:,42 
ValidateXml,08:11:44.7059002,JobId:,22 
ReadDocument,08:11:44.7089023,JobId:,44 
ReadDocument,08:11:44.7049004,JobId:,43 
ReadDocument,08:11:45.7030090,JobId:,45 
ValidateXml,08:11:45.7030090,JobId:,23 
ValidateXml,08:11:45.7120179,JobId:,24 
ValidateXml,08:11:45.7120179,JobId:,25 
ReadDocument,08:11:45.7140087,JobId:,46 
ValidateXml,08:11:45.7170104,JobId:,26 
ReadDocument,08:11:45.7190107,JobId:,47 
ProcessJob,08:11:45.7200086,JobId:,3 
ValidateXml,08:11:45.7170104,JobId:,27 
ReadDocument,08:11:46.7071167,JobId:,48 
ReadDocument,08:11:46.7101161,JobId:,50 
ProcessJob,08:11:46.7111152,JobId:,4 
ValidateXml,08:11:46.7111152,JobId:,28 
ReadDocument,08:11:46.7071167,JobId:,49 
ValidateXml,08:11:47.7032249,JobId:,29 
ReadDocument,08:11:47.7062243,JobId:,51 
ReadDocument,08:11:47.7072261,JobId:,52 
ReadDocument,08:11:47.7092253,JobId:,53 
ProcessJob,08:11:47.7102243,JobId:,5 
ProcessJob,08:11:47.7112241,JobId:,7 
ReadDocument,08:11:47.7102243,JobId:,55 
ValidateXml,08:11:47.7062243,JobId:,30 
ProcessJob,08:11:47.7102243,JobId:,6 
ValidateXml,08:11:47.7072261,JobId:,31 
ReadDocument,08:11:47.7092253,JobId:,54 
ReadDocument,08:11:48.7063329,JobId:,56 
ProcessJob,08:11:48.7073331,JobId:,8 
ValidateXml,08:11:48.7063329,JobId:,32 
ValidateXml,08:11:48.7063329,JobId:,33 
ValidateXml,08:11:49.7074443,JobId:,34 
ReadDocument,08:11:49.7104422,JobId:,59 
ReadDocument,08:11:49.7124418,JobId:,60 
ProcessJob,08:11:49.7124418,JobId:,9 
ValidateXml,08:11:49.7144433,JobId:,36 
ValidateXml,08:11:49.7114420,JobId:,35 
ReadDocument,08:11:49.7074443,JobId:,57 
ReadDocument,08:11:49.7084468,JobId:,58 
ValidateXml,08:11:50.7065604,JobId:,37 
ReadDocument,08:11:50.7095502,JobId:,61 
ProcessJob,08:11:50.7105504,JobId:,10 
ReadDocument,08:11:50.7115502,JobId:,63 
ValidateXml,08:11:50.7125515,JobId:,40 
ReadDocument,08:11:50.7105504,JobId:,62 
ValidateXml,08:11:50.7095502,JobId:,39 
ValidateXml,08:11:50.7075518,JobId:,38 
ReadDocument,08:11:50.7115502,JobId:,64 
ReadDocument,08:11:51.7076596,JobId:,65 
ReadDocument,08:11:51.7086597,JobId:,66 
ProcessJob,08:11:51.7116603,JobId:,13 
ProcessJob,08:11:51.7106605,JobId:,12 
ProcessJob,08:11:51.7086597,JobId:,11 
ValidateXml,08:11:51.7076596,JobId:,41 
SendToDataBase,08:11:51.7366672,JobId:,1 
SendToDataBase,08:11:51.7416631,JobId:,2 
SendToDataBase,08:11:51.7496646,JobId:,3 
CreateResponse,08:11:51.7546639,JobId:,56 
ValidateXml,08:11:52.7037712,JobId:,42 
ValidateXml,08:11:52.7037712,JobId:,43 
ValidateXml,08:11:52.7077662,JobId:,44 
ReadDocument,08:11:52.7107675,JobId:,69 
ProcessJob,08:11:52.7077662,JobId:,14 
ProcessJob,08:11:52.7077662,JobId:,15 
ProcessJob,08:11:52.7087683,JobId:,16 
ProcessJob,08:11:52.7087683,JobId:,17 
ValidateXml,08:11:52.7097669,JobId:,45 
ReadDocument,08:11:52.7097669,JobId:,67 
ValidateXml,08:11:52.7097669,JobId:,46 
ReadDocument,08:11:52.7107675,JobId:,68 
ValidateXml,08:11:53.7069300,JobId:,47 
ReadDocument,08:11:53.7078801,JobId:,70 
ValidateXml,08:11:53.7108792,JobId:,48 
SendToDataBase,08:11:53.7118774,JobId:,4 
SendToDataBase,08:11:53.7208818,JobId:,5 
SendToDataBase,08:11:53.7228802,JobId:,6 
SendToDataBase,08:11:53.7238781,JobId:,7 
SendToDataBase,08:11:53.7258800,JobId:,8 
ReadDocument,08:11:53.7118774,JobId:,73 
ReadDocument,08:11:53.7098805,JobId:,71 
ReadDocument,08:11:53.7118774,JobId:,72 
ValidateXml,08:11:54.7059933,JobId:,49 
ValidateXml,08:11:54.7069847,JobId:,50 
ValidateXml,08:11:54.7089874,JobId:,51 
CreateResponse,08:11:54.7109862,JobId:,41 
CreateResponse,08:11:54.7169842,JobId:,42 
SendToDataBase,08:11:54.7149888,JobId:,9 
SendToDataBase,08:11:54.7259874,JobId:,10 
SendToDataBase,08:11:54.7269883,JobId:,11 
ProcessJob,08:11:54.7119868,JobId:,18 
ReadDocument,08:11:54.7059933,JobId:,74 
ValidateXml,08:11:54.7109862,JobId:,53 
ProcessJob,08:11:54.7119868,JobId:,19 
ProcessJob,08:11:54.7129854,JobId:,20 
ValidateXml,08:11:54.7099852,JobId:,52 
ReadDocument,08:11:54.7129854,JobId:,76 
ReadDocument,08:11:54.7069847,JobId:,75 
ReadDocument,08:11:55.7090940,JobId:,77 
ReadDocument,08:11:55.7140926,JobId:,78 
ValidateXml,08:11:55.7140926,JobId:,54 
SendToDataBase,08:11:55.7180953,JobId:,12 
CreateResponse,08:11:55.7180953,JobId:,43 
ProcessJob,08:11:55.7180953,JobId:,21 
SendToDataBase,08:11:55.7230962,JobId:,13 
ValidateXml,08:11:55.7170947,JobId:,55 
ReadDocument,08:11:55.7160937,JobId:,79 
ReadDocument,08:11:55.7170947,JobId:,80 
ValidateXml,08:11:55.8111031,JobId:,57 
ReadDocument,08:11:55.8111031,JobId:,81 
ProcessJob,08:11:55.8451120,JobId:,22 
ProcessJob,08:11:56.1251577,JobId:,23 
ReadDocument,08:11:56.2531569,JobId:,82 
ReadDocument,08:11:56.3441756,JobId:,83 
ProcessJob,08:11:56.3571695,JobId:,24 
ValidateXml,08:11:56.3851785,JobId:,58 
ReadDocument,08:11:56.4061804,JobId:,84 
ValidateXml,08:11:56.6222012,JobId:,59 
CreateResponse,08:11:56.6222012,JobId:,49 
ProcessJob,08:11:56.9112320,JobId:,25 
ValidateXml,08:11:56.9412405,JobId:,60 
ProcessJob,08:11:57.0002533,JobId:,26 
ValidateXml,08:11:57.2352587,JobId:,61 
ProcessJob,08:11:57.4852908,JobId:,27 
ReadDocument,08:11:58.2093656,JobId:,85 
SendToDataBase,08:11:58.2163692,JobId:,14 
ReadDocument,08:11:58.2113664,JobId:,87 
SendToDataBase,08:11:58.2203645,JobId:,15 
SendToDataBase,08:11:58.2293743,JobId:,16 
SendToDataBase,08:11:58.2303706,JobId:,17 
SendToDataBase,08:11:58.2313662,JobId:,18 
SendToDataBase,08:11:58.2333692,JobId:,19 
SendToDataBase,08:11:58.2353681,JobId:,20 
SendToDataBase,08:11:58.2373688,JobId:,21 
SendToDataBase,08:11:58.2383671,JobId:,22 
SendToDataBase,08:11:58.2393673,JobId:,23 
ValidateXml,08:11:58.2123658,JobId:,63 
CreateResponse,08:11:58.2163692,JobId:,50 
CreateResponse,08:11:58.2543716,JobId:,51 
CreateResponse,08:11:58.2643699,JobId:,52 
CreateResponse,08:11:58.2663730,JobId:,53 
ProcessJob,08:11:58.2143646,JobId:,31 
ProcessJob,08:11:58.2123658,JobId:,29 
ReadDocument,08:11:58.2093656,JobId:,86 
ReadDocument,08:11:58.2123658,JobId:,88 
ProcessJob,08:11:58.2133656,JobId:,30 
ProcessJob,08:11:58.2103650,JobId:,28 
ValidateXml,08:11:58.2113664,JobId:,62 
ReadDocument,08:11:58.2123658,JobId:,89 
ValidateXml,08:11:58.2133656,JobId:,64 
ValidateXml,08:11:59.7055294,JobId:,65 
ReadDocument,08:11:59.7065300,JobId:,91 
ValidateXml,08:11:59.7065300,JobId:,66 
SendToDataBase,08:11:59.7115275,JobId:,24 
SendToDataBase,08:11:59.7195324,JobId:,25 
SendToDataBase,08:11:59.7205330,JobId:,26 
ProcessJob,08:11:59.7085277,JobId:,33 
ValidateXml,08:11:59.7085277,JobId:,68 
ReadDocument,08:11:59.7095263,JobId:,93 
ValidateXml,08:11:59.7085277,JobId:,67 
ReadDocument,08:11:59.7095263,JobId:,92 
ProcessJob,08:11:59.7095263,JobId:,34 
ProcessJob,08:11:59.7075275,JobId:,32 
ReadDocument,08:11:59.7055294,JobId:,90 
ValidateXml,08:11:59.7105265,JobId:,70 
ValidateXml,08:11:59.7095263,JobId:,69 
ReadDocument,08:11:59.7105265,JobId:,94 
ValidateXml,08:12:00.7146358,JobId:,71 
SendToDataBase,08:12:00.7176364,JobId:,27 
ReadDocument,08:12:00.7156372,JobId:,97 
ProcessJob,08:12:00.7146358,JobId:,35 
ProcessJob,08:12:00.7156372,JobId:,36 
ReadDocument,08:12:00.7146358,JobId:,95 
ReadDocument,08:12:00.7156372,JobId:,96 
ReadDocument,08:12:00.8616797,JobId:,98 
ValidateXml,08:12:00.8796565,JobId:,72 
ReadDocument,08:12:00.9066595,JobId:,99 
ReadDocument,08:12:00.9786697,JobId:,100 
ValidateXml,08:12:00.9866692,JobId:,73 
ProcessJob,08:12:01.0766830,JobId:,37 
ValidateXml,08:12:01.1176829,JobId:,74 
ProcessJob,08:12:01.1176829,JobId:,38 
ProcessJob,08:12:01.2167037,JobId:,39 
SendToDataBase,08:12:01.2167037,JobId:,28 
SendToDataBase,08:12:01.2216970,JobId:,29 
SendToDataBase,08:12:01.2236923,JobId:,30 
SendToDataBase,08:12:01.2246914,JobId:,31 
ValidateXml,08:12:01.2327001,JobId:,75 
ValidateXml,08:12:01.5447286,JobId:,76 
ProcessJob,08:12:01.6567738,JobId:,40 
ValidateXml,08:12:01.9347686,JobId:,77 
ProcessJob,08:12:02.2498041,JobId:,44 
ProcessJob,08:12:02.4448257,JobId:,45 
SendToDataBase,08:12:02.4458286,JobId:,32 
ValidateXml,08:12:02.5469861,JobId:,78 
ProcessJob,08:12:02.6268456,JobId:,46 
SendToDataBase,08:12:02.6278997,JobId:,33 
SendToDataBase,08:12:02.6378977,JobId:,34 
SendToDataBase,08:12:02.6398461,JobId:,35 
ValidateXml,08:12:02.6538506,JobId:,79 
ProcessJob,08:12:03.1399063,JobId:,47 
SendToDataBase,08:12:03.1489053,JobId:,36 
ValidateXml,08:12:03.2979184,JobId:,80 
ProcessJob,08:12:03.4959402,JobId:,48 
ValidateXml,08:12:03.6259629,JobId:,81 
ValidateXml,08:12:03.6769676,JobId:,82 
ProcessJob,08:12:03.7719693,JobId:,54 
ProcessJob,08:12:03.8519797,JobId:,55 
ProcessJob,08:12:03.9689901,JobId:,57 
SendToDataBase,08:12:04.0079945,JobId:,37 
SendToDataBase,08:12:04.0099953,JobId:,38 
SendToDataBase,08:12:04.0109931,JobId:,39 
SendToDataBase,08:12:04.0119941,JobId:,40 
ValidateXml,08:12:04.0299989,JobId:,84 
ValidateXml,08:12:04.0089966,JobId:,83 
ProcessJob,08:12:04.3350372,JobId:,58 
ValidateXml,08:12:04.6541474,JobId:,85 
ProcessJob,08:12:04.8791864,JobId:,59 
SendToDataBase,08:12:04.8791864,JobId:,44 
SendToDataBase,08:12:05.0252098,JobId:,45 
SendToDataBase,08:12:05.0757198,JobId:,46 
ProcessJob,08:12:05.0757198,JobId:,60 
ValidateXml,08:12:05.1527328,JobId:,86 
ProcessJob,08:12:05.1532325,JobId:,61 
ValidateXml,08:12:05.2762716,JobId:,87 
ValidateXml,08:12:05.3793706,JobId:,88 
ValidateXml,08:12:05.5953056,JobId:,89 
ValidateXml,08:12:05.6453136,JobId:,90 
ProcessJob,08:12:05.8313378,JobId:,62 
SendToDataBase,08:12:05.8313378,JobId:,47 
ValidateXml,08:12:06.1573930,JobId:,91 
ValidateXml,08:12:06.2043839,JobId:,92 
ProcessJob,08:12:06.4384015,JobId:,63 
SendToDataBase,08:12:06.4384015,JobId:,48 
ProcessJob,08:12:06.6554190,JobId:,64 
ProcessJob,08:12:06.7494355,JobId:,65 
SendToDataBase,08:12:06.7494355,JobId:,54 
SendToDataBase,08:12:06.7594308,JobId:,55 
SendToDataBase,08:12:06.7624294,JobId:,57 
ProcessJob,08:12:06.9254482,JobId:,66 
SendToDataBase,08:12:06.9254482,JobId:,58 
ValidateXml,08:12:07.0154624,JobId:,93 
ValidateXml,08:12:07.0975086,JobId:,94 
ProcessJob,08:12:07.1925138,JobId:,67 
ValidateXml,08:12:07.2724877,JobId:,95 
ProcessJob,08:12:07.6385268,JobId:,68 
ProcessJob,08:12:07.7705429,JobId:,69 
ValidateXml,08:12:07.8315476,JobId:,96 
ProcessJob,08:12:07.8905526,JobId:,70 
SendToDataBase,08:12:07.8905526,JobId:,59 
SendToDataBase,08:12:07.8965534,JobId:,60 
SendToDataBase,08:12:07.8975535,JobId:,61 
ValidateXml,08:12:08.1306009,JobId:,97 
ValidateXml,08:12:08.2065895,JobId:,98 
ValidateXml,08:12:08.3106332,JobId:,99 
ProcessJob,08:12:08.3296082,JobId:,71 
ValidateXml,08:12:08.4406159,JobId:,100 
ProcessJob,08:12:08.8396557,JobId:,72 
SendToDataBase,08:12:08.8446570,JobId:,62 
SendToDataBase,08:12:08.8806613,JobId:,63 
SendToDataBase,08:12:08.8946619,JobId:,64 
ProcessJob,08:12:09.0076746,JobId:,73 
SendToDataBase,08:12:09.0086763,JobId:,65 
ProcessJob,08:12:09.0996850,JobId:,74 
ProcessJob,08:12:09.1106847,JobId:,75 
SendToDataBase,08:12:09.1106847,JobId:,66 
SendToDataBase,08:12:09.1136860,JobId:,67 
ProcessJob,08:12:09.6547630,JobId:,76 
SendToDataBase,08:12:09.6557462,JobId:,68 
ProcessJob,08:12:09.9218032,JobId:,77 
ProcessJob,08:12:10.2218075,JobId:,78 
ProcessJob,08:12:10.4288308,JobId:,79 
SendToDataBase,08:12:10.4288308,JobId:,69 
SendToDataBase,08:12:10.4408307,JobId:,70 
SendToDataBase,08:12:10.4448318,JobId:,71 
ProcessJob,08:12:10.6858596,JobId:,80 
SendToDataBase,08:12:10.6858596,JobId:,72 
ProcessJob,08:12:11.4049481,JobId:,81 
ProcessJob,08:12:11.7039814,JobId:,82 
ProcessJob,08:12:11.8272054,JobId:,83 
ProcessJob,08:12:11.9930072,JobId:,84 
SendToDataBase,08:12:11.9930072,JobId:,73 
SendToDataBase,08:12:11.9979988,JobId:,74 
SendToDataBase,08:12:11.9989983,JobId:,75 
SendToDataBase,08:12:11.9989983,JobId:,76 
ProcessJob,08:12:12.3460366,JobId:,85 
ProcessJob,08:12:12.4520491,JobId:,86 
SendToDataBase,08:12:12.4520491,JobId:,77 
ProcessJob,08:12:12.8810952,JobId:,87 
ProcessJob,08:12:13.1443167,JobId:,88 
SendToDataBase,08:12:13.1443167,JobId:,78 
SendToDataBase,08:12:13.1471282,JobId:,79 
ProcessJob,08:12:13.2041414,JobId:,89 
SendToDataBase,08:12:13.2081302,JobId:,80 
SendToDataBase,08:12:13.2101309,JobId:,81 
ProcessJob,08:12:13.4381566,JobId:,90 
SendToDataBase,08:12:13.4392215,JobId:,82 
ProcessJob,08:12:13.6411889,JobId:,91 
SendToDataBase,08:12:13.6411889,JobId:,83 
ProcessJob,08:12:13.9472212,JobId:,92 
SendToDataBase,08:12:13.9472212,JobId:,84 
ProcessJob,08:12:14.3122494,JobId:,93 
ProcessJob,08:12:14.7053031,JobId:,94 
SendToDataBase,08:12:14.7053031,JobId:,85 
SendToDataBase,08:12:14.7092946,JobId:,86 
ProcessJob,08:12:14.9393634,JobId:,95 
ProcessJob,08:12:15.4103709,JobId:,96 
SendToDataBase,08:12:15.4113707,JobId:,87 
ProcessJob,08:12:15.9355263,JobId:,97 
ProcessJob,08:12:15.9724349,JobId:,98 
SendToDataBase,08:12:15.9724349,JobId:,88 
SendToDataBase,08:12:15.9774350,JobId:,89 
ProcessJob,08:12:15.9724349,JobId:,99 
SendToDataBase,08:12:15.9784371,JobId:,90 
SendToDataBase,08:12:15.9834330,JobId:,91 
ProcessJob,08:12:16.6175125,JobId:,100 
SendToDataBase,08:12:16.6175125,JobId:,92 
SendToDataBase,08:12:16.6555160,JobId:,93 
SendToDataBase,08:12:17.5005984,JobId:,94 
SendToDataBase,08:12:17.8846409,JobId:,95 
SendToDataBase,08:12:17.8886408,JobId:,96 
SendToDataBase,08:12:18.1186677,JobId:,97 
SendToDataBase,08:12:18.7557365,JobId:,98 
SendToDataBase,08:12:18.7567394,JobId:,99 
SendToDataBase,08:12:19.5488221,JobId:,100 

Редактировать Новой подписка будет либо отправить свои элементы в Db или изменяла неисправную работу таким образом вы выбрать.

Дополнительные ресурсы:

Stack Exchange Code Review

Dataflow Source

+0

Итак, я использовал ваш код, и я вижу две проблемы. Один из них заключается в том, что он передаст задания на CreateResponse, где это не должно из-за того, что 'BoundedCapacity' блокируется до 1 или 32 в параметрах. Задания должны идти только туда, если returnCode не равен 100. Если я установил его в Unbounded, выходы для «ProcessJob» не в порядке, но SendToDataBase есть, что заставляет меня думать, что он переупорядочивает текущий обработанный элемент, прежде чем возвращать его из «TransformBlock» или до тех пор, пока следующий элемент в порядке не завершится обработкой, но сама обработка внутри Блока не в порядке? – Peter

+0

@Peter В вашем модифицированном коде я перенаправил ваш окончательный вывод в поток, каждое задание обрабатывается по порядку в обработчике этого потока, который можно увидеть в подписке. Отмена и недостатки не были в центре внимания этого вопроса, и это все ваше отображение кода возврата. Я не могу сказать, что именно вы устанавливаете на «Без ограничений», но если вы устанавливаете «ActionBlock» с более чем «MaxDegreeOfParallelism» на неограниченную емкость, он будет обрабатывать ваши товары не в порядке. Посмотрите быстрый способ, который обрабатывает код возврата в потоке. – JSteward

+0

Также: _Outputs для ProcessJob_ не эквивалентны «WriteLine» ProcessJob. «TransformBlock» будет изменять порядок выходов перед передачей их потоку и/или следующему блоку.Задача «WriteLine» в процессе корректно выведена из строя, потому что элементы обрабатываются на этом шаге не в порядке, то есть параллельно, они затем переупорядочиваются этим «TransformBlock» и отправляются в следующий поток/блок. – JSteward

5

Это можно сделать, если вы связываете TransformBlock к ActionBlock.

Это проще всего продемонстрировать с помощью компилируемого консольного приложения.

Это приложение обрабатывает последовательность целых чисел, но вы можете заменить целые классы классом рабочего класса.

(я модифицировал этот код из утилиты, которую я написал, которая использует многопоточное сжатие файлов, используя относительно медленный алгоритм сжатия LZMA.Эта утилита должна последовательно считывать входные данные из файла, а затем передавать ее в блоках в очередь который обрабатывает данные, используя несколько потоков в любом порядке, и, наконец, выводит сжатые блоки его в очереди, который должен сохранить первоначальный порядок блоков данных)

образец кода:.

using System; 
using System.Collections.Generic; 
using System.Linq; 
using System.Threading; 
using System.Threading.Tasks; 
using System.Threading.Tasks.Dataflow; 

namespace Demo 
{ 
    class Program 
    { 
     public static void Main() 
     { 
      var data = Enumerable.Range(1, 100); 
      var task = Process(data); 

      Console.WriteLine("Waiting for task to complete"); 
      task.Wait(); 
      Console.WriteLine("Task complete."); 
     } 

     public static async Task Process(IEnumerable<int> data) 
     { 
      var queue = new TransformBlock<int, int>(block => process(block), transformBlockOptions()); 
      var writer = new ActionBlock<int>(block => write(block), actionBlockOptions()); 

      queue.LinkTo(writer, new DataflowLinkOptions { PropagateCompletion = true }); 

      await enqueDataToProcessAndAwaitCompletion(data, queue); 

      await writer.Completion; 
     } 

     static int process(int block) 
     { 
      Console.WriteLine($"Thread {Thread.CurrentThread.ManagedThreadId} is processing block {block}"); 
      emulateWorkload(); 
      return -block; 
     } 

     static void write(int block) 
     { 
      Console.WriteLine("Output: " + block); 
     } 

     static async Task enqueDataToProcessAndAwaitCompletion(IEnumerable<int> data, TransformBlock<int, int> queue) 
     { 
      await enqueueDataToProcess(data, queue); 
      queue.Complete(); 
     } 

     static async Task enqueueDataToProcess(IEnumerable<int> data, ITargetBlock<int> queue) 
     { 
      foreach (var item in data) 
       await queue.SendAsync(item); 
     } 


     static ExecutionDataflowBlockOptions transformBlockOptions() 
     { 
      return new ExecutionDataflowBlockOptions 
      { 
       MaxDegreeOfParallelism = 8, 
       BoundedCapacity = 32 
      }; 
     } 

     private static ExecutionDataflowBlockOptions actionBlockOptions() 
     { 
      return new ExecutionDataflowBlockOptions 
      { 
       MaxDegreeOfParallelism = 1, 
       BoundedCapacity = 1 
      }; 
     } 

     static Random rng = new Random(); 
     static object locker = new object(); 

     static void emulateWorkload() 
     { 
      int delay; 

      lock (locker) 
      { 
       delay = rng.Next(250, 750); 
      } 

      Thread.Sleep(delay); 
     } 
    } 
} 

выход:

Waiting for task to complete 
Thread 8 is processing block 8 
Thread 5 is processing block 2 
Thread 6 is processing block 6 
Thread 4 is processing block 5 
Thread 7 is processing block 7 
Thread 10 is processing block 4 
Thread 9 is processing block 1 
Thread 3 is processing block 3 
Thread 3 is processing block 9 
Thread 8 is processing block 10 
Thread 5 is processing block 11 
Thread 6 is processing block 12 
Thread 9 is processing block 13 
Thread 10 is processing block 14 
Thread 7 is processing block 15 
Thread 8 is processing block 16 
Thread 4 is processing block 17 
Thread 5 is processing block 18 
Thread 3 is processing block 19 
Thread 9 is processing block 20 
Thread 8 is processing block 21 
Output: -1 
Output: -2 
Output: -3 
Output: -4 
Output: -5 
Output: -6 
Output: -7 
Output: -8 
Output: -9 
Output: -10 
Output: -11 
Output: -12 
Output: -13 
Thread 6 is processing block 22 
Thread 10 is processing block 23 
Output: -14 
Thread 7 is processing block 24 
Output: -15 
Output: -16 
Thread 6 is processing block 25 
Output: -17 
Thread 4 is processing block 26 
Thread 5 is processing block 27 
----------------->SNIP<----------------- 
Thread 10 is processing block 93 
Thread 8 is processing block 94 
Output: -83 
Thread 4 is processing block 95 
Output: -84 
Output: -85 
Output: -86 
Output: -87 
Thread 3 is processing block 96 
Output: -88 
Thread 6 is processing block 97 
Thread 5 is processing block 98 
Thread 10 is processing block 99 
Thread 9 is processing block 100 
Output: -89 
Output: -90 
Output: -91 
Output: -92 
Output: -93 
Output: -94 
Output: -95 
Output: -96 
Output: -97 
Output: -98 
Output: -99 
Output: -100 
Task complete. 
Press any key to continue . . . 

Обратите внимание, что «блоки» обрабатываются в любом порядке несколькими потоками, но порядок вывода совпадает с порядком ввода.

Это очень важно, чтобы установить параметры блока действия вывода согласно методу actionBlockOptions() с MaxDegreeOfParallelism и BoundedCapacity оба набора 1.

Это то, что вызывает выход сериализовать в правильном порядке. Если вы установили BoundedCapacity и MaxDegreeOfParallelism более чем на 1 для вывода, тогда это может быть выведено в неправильном порядке.

+0

Спасибо @Matthew Уотсон. Насколько я понял, я мог просто использовать ActionBlock для вывода всех файлов, которые были прочитаны (в том порядке, в котором они вошли) и отправить его в ту же комбинацию BufferBlock/TransformBlock/ActionBlock или вы видите проблему с цепочкой этих блоков ? – Peter

+0

@Peter Я думаю, что это должно работать нормально. Это похоже на то, как работает мой файл. –

5

@Matthew Watson имеет хорошее предложение. Я просто хочу добавить, что нет необходимости ограничивать конечный блок действий с помощью MaxDegreeOfParallelism и BoundedCapacity до 1, если вы не используете пакет Microsoft.Tpl.Dataflow. Более новый и правильный, System.Threading.Tasks.Dataflow добавляет свойство EnsureOrdered к параметрам блока выполнения. Хотя это, похоже, не задокументировано в MSDN, вы можете найти это свойство и его использование в TPL Dataflow Source.

Вот пример и тест, демонстрирующий это поведение, изменение параметра EnsureOrdered в вариантах выполнения на значение false приведет к сбою теста. Значение по умолчанию - true и не обязательно должно быть явно задано для упорядоченного поведения.

Edit: Как было отмечено ниже, @Matthew Уотсоном, в то время как EnsureOrdered держит вещи заказанных между пропагаторными блоками, один раз в сообщениях блока действий может быть обработан в любом порядке.

Edit2: Примечание: если ActionBlock имеет MaxDegreeOfParllelism и BoundedCapacity набор к одному, но EnsureOrdered ложно, тест потерпит неудачу, и результаты будут в порядке.

[TestFixture] 
public class TestRunner { 

    [Test] 
    public void TestPipeline() { 
     var data = Enumerable.Range(0, 30).Select(x => new Message(x, x)).ToList(); 

     var target = new MyDataflow(); 
     target.PostData(data).Wait(); 

     Assert.IsTrue(data.SequenceEqual(target.OutputMessages)); 
    } 
} 

public class MyDataflow { 

    private static Random rnd = new Random(); 

    private BufferBlock<Message> buffer; 
    private TransformBlock<Message, Message> xForm1; 
    private ActionBlock<Message> action; 
    public IList<Message> OutputMessages { get; set; } 

    public MyDataflow() { 
     OutputMessages = new List<Message>(); 
     CreatePipeline(); 
     LinkPipeline(); 
    } 

    public void CreatePipeline() { 
     var options = new ExecutionDataflowBlockOptions() { 
      BoundedCapacity = 2, 
      MaxDegreeOfParallelism = 10, 
      EnsureOrdered = true 
     }; 

     buffer = new BufferBlock<Message>(); 

     xForm1 = new TransformBlock<Message, Message>(x => { 
      Console.WriteLine($"{DateTime.Now.TimeOfDay} - Started Id: {x.Id}"); 
      Task.Delay(rnd.Next(1000, 3000)).Wait(); 
      Console.WriteLine($"{DateTime.Now.TimeOfDay} - Finished Id: {x.Id}"); 
      return x; 
     }, options); 

     action = new ActionBlock<Message>(x => { 
      Console.WriteLine($"{DateTime.Now.TimeOfDay} - Output Id: {x.Id} Value: {x.Value}"); 

      //this delay will cause the messages to be unordered 
      Task.Delay(rnd.Next(1000, 3000)).Wait(); 

      OutputMessages.Add(x); 
     }, options); 
    } 

    public void LinkPipeline() { 
     var options = new DataflowLinkOptions() { 
      PropagateCompletion = true 
     }; 

     buffer.LinkTo(xForm1, options); 
     xForm1.LinkTo(action, options); 
    } 

    public Task PostData(IEnumerable<Message> data) { 

     foreach (var item in data) { 
      buffer.Post(item); 
     } 
     buffer.Complete(); 
     return action.Completion; 
    } 
} 

public class Message { 
    public Message(int id, int value) { 
     this.Id = id; 
     this.Value = value; 
    } 
    public int Id { get; set; } 
    public int Value { get; set; } 
} 

Edit: К сожалению, мы не можем непосредственно получить доступ к внутренним ReorderingBuffer. Таким образом, альтернативой с BoundedCapacity10 и MaxDegreeOfParallelism, равным единице, было бы привязать упорядоченный вывод TransformBlock к потоку. Обратите внимание, что в приведенном выше коде задержка в разрешенной параллели ActionBlock приводит к тому, что результат не соответствует порядку, но в коде ниже задержка в обработке потока не будет нарушать порядок. По существу, обеспечивая такое же поведение, как синхронное ActionBlock и может кормить другую часть сетки и т.д.

[TestFixture] 
public class TestRunner { 

    [Test] 
    public void TestPipeline() { 
     var data = Enumerable.Range(0, 30).Select(x => new Message(x, x)).ToList(); 

     var target = new MyDataflow(); 
     target.PostData(data).Wait(); 

     Assert.IsTrue(data.SequenceEqual(target.OutputMessages)); 
    } 
} 

public class MyDataflow { 

    private static Random rnd = new Random(); 

    private BufferBlock<Message> buffer; 
    private TransformBlock<Message, Message> xForm1; 
    private IObservable<Message> output; 
    private TaskCompletionSource<bool> areWeDoneYet; 
    public IList<Message> OutputMessages { get; set; } 

    public MyDataflow() { 
     OutputMessages = new List<Message>(); 
     CreatePipeline(); 
     LinkPipeline(); 
    } 

    public void CreatePipeline() { 
     var options = new ExecutionDataflowBlockOptions() { 
      BoundedCapacity = 13, 
      MaxDegreeOfParallelism = 10, 
      EnsureOrdered = true 
     }; 

     buffer = new BufferBlock<Message>(); 

     xForm1 = new TransformBlock<Message, Message>(x => { 
      Console.WriteLine($"{DateTime.Now.TimeOfDay} - Started Id: {x.Id}"); 
      Task.Delay(rnd.Next(1000, 3000)).Wait(); 
      Console.WriteLine($"{DateTime.Now.TimeOfDay} - Finished Id: {x.Id}"); 
      return x; 
     }, options); 

     output = xForm1.AsObservable<Message>(); 

     areWeDoneYet = new TaskCompletionSource<bool>(); 
    } 

    public void LinkPipeline() { 
     var options = new DataflowLinkOptions() { 
      PropagateCompletion = true 
     }; 

     buffer.LinkTo(xForm1, options); 

     var subscription = output.Subscribe(msg => { 
      Task.Delay(rnd.Next(1000, 3000)).Wait(); 
      OutputMessages.Add(msg); 
     },() => areWeDoneYet.SetResult(true));    
    } 

    public Task<bool> PostData(IEnumerable<Message> data) {    
     foreach (var item in data) { 
      buffer.Post(item); 
     } 
     buffer.Complete(); 
     return areWeDoneYet.Task; 
    } 
} 

public class Message { 
    public Message(int id, int value) { 
     this.Id = id; 
     this.Value = value; 
    } 
    public int Id { get; set; } 
    public int Value { get; set; } 
} 

Edit2: Кроме того, мой трубопровод должен иметь 3 из этих этапов, как я мог связать их?Поэтому, когда первый блок выполняется с первым файлом, он начинает переносить данные в следующий блок, который будет работать параллельно и снова асинхронно.

Это не связано с тем, как они связаны, но в ExecutionDataflowBlockOptions. С приведенными ниже параметрами первый блок будет запускать задачи в соответствии с количеством отправленных файлов и их временем обработки, по завершении они будут выводиться либо на следующий этап обработки, либо на обработку отказа ActionBlock на основе вашего предиката Job.ReturnCode , и то же самое будет следовать для следующих этапов. Вы также можете изменить свои параметры ActionBlock для обработки нескольких удачных/неудачных попыток с вашего TransformBlocks.

var options = new ExecutionDataflowBlockOptions() { 
      BoundedCapacity = 10, 
      MaxDegreeOfParallelism = 10, 
      EnsureOrdered = true 
     }; 
var loadXml = new TransformBlock<Job, Job>(job => { ... }, options); // I/O 
var validateData = new TransformBlock<Job, Job>(job => { ... }, options); // Parsing&Validating&Calculations 
var importJob = new TransformBlock<Job, Job>(job => { ... }, options); // Saving to database 

var loadingFailed = new ActionBlock<Job>(job => CreateResponse(job)); 
var validationFailed = new ActionBlock<Job>(job => CreateResponse(job)); 
var reportImport = new ActionBlock<Job>(job => CreateResponse(job)); 

loadXml.LinkTo(validateData, job => job.ReturnCode == 100); 
loadXml.LinkTo(loadingFailed); 

validateData.LinkTo(importJob, Job => Job.ReturnCode == 100); 
validateData.LinkTo(validationFailed); 

importJob.LinkTo(reportImport); 

Edit3 В ответ на добавленную исходный код OP в: Ваш проигрышные заказанный поведение в ваш последний блок преобразования, установив MaxDegreeOfParallelism и BoundedCapcity к 1. Позвольте мне быть ясно не делать, что «обеспечить заказ» это только борьба с библиотекой. Вот соответствующий отрывок из TransformBlock:

  // If parallelism is employed, we will need to support reordering messages that complete out-of-order. 
      // However, a developer can override this with EnsureOrdered == false. 
      if (dataflowBlockOptions.SupportsParallelExecution && dataflowBlockOptions.EnsureOrdered) 
      { 
       _reorderingBuffer = new ReorderingBuffer<TOutput>(this, (owningSource, message) => ((TransformBlock<TInput, TOutput>)owningSource)._source.AddMessage(message)); 
      } 

Вот пробег с 20 точек данных, сделанных с кодом модифицированного использовать параллелизм в окончательном TBlock. Модифицированный к базовому CSV для рассмотрения в Excel, то есть заменить «» => «» :)

Function,TimeStamp/Inserted JobId,Other,Other,Other,Other,Other,Other,Other,JobId From functions 
ReadDocument,04:54.0,|,Thread,6,is,processing,Job,Id:,1 
ReadDocument,04:54.0,|,Thread,11,is,processing,Job,Id:,2 
ReadDocument,04:56.0,|,Thread,13,is,processing,Job,Id:,3 
ReadDocument,04:56.0,|,Thread,6,is,processing,Job,Id:,4 
ReadDocument,04:57.0,|,Thread,11,is,processing,Job,Id:,5 
ReadDocument,04:57.0,|,Thread,14,is,processing,Job,Id:,6 
ReadDocument,04:58.0,|,Thread,15,is,processing,Job,Id:,7 
ReadDocument,04:58.0,|,Thread,6,is,processing,Job,Id:,8 
ReadDocument,04:59.0,|,Thread,11,is,processing,Job,Id:,9 
ReadDocument,04:59.0,|,Thread,16,is,processing,Job,Id:,10 
ReadDocument,05:00.0,|,Thread,17,is,processing,Job,Id:,12 
ReadDocument,05:00.0,|,Thread,15,is,processing,Job,Id:,11 
ReadDocument,05:01.0,|,Thread,16,is,processing,Job,Id:,13 
ReadDocument,05:01.0,|,Thread,18,is,processing,Job,Id:,14 
ReadDocument,05:02.0,|,Thread,15,is,processing,Job,Id:,15 
ReadDocument,05:02.0,|,Thread,17,is,processing,Job,Id:,20 
ValidateXml,05:02.0,|,Thread,19,is,processing,Job,Id:,1 
ReadDocument,05:02.0,|,Thread,14,is,processing,Job,Id:,17 
ReadDocument,05:02.0,|,Thread,13,is,processing,Job,Id:,16 
ReadDocument,05:02.0,|,Thread,11,is,processing,Job,Id:,18 
ReadDocument,05:02.0,|,Thread,6,is,processing,Job,Id:,19 
ValidateXml,05:03.0,|,Thread,16,is,processing,Job,Id:,2 
ValidateXml,05:03.0,|,Thread,20,is,processing,Job,Id:,3 
ValidateXml,05:04.0,|,Thread,11,is,processing,Job,Id:,4 
ValidateXml,05:04.0,|,Thread,21,is,processing,Job,Id:,7 
ValidateXml,05:04.0,|,Thread,18,is,processing,Job,Id:,5 
ValidateXml,05:04.0,|,Thread,15,is,processing,Job,Id:,6 
ValidateXml,05:04.5,|,Thread,16,is,processing,Job,Id:,8 
ValidateXml,05:04.5,|,Thread,6,is,processing,Job,Id:,9 
ValidateXml,05:04.6,|,Thread,19,is,processing,Job,Id:,10 
ProcessJob,05:04.6,|,Thread,14,is,processing,Job,Id:,2 
ProcessJob,05:04.6,|,Thread,22,is,processing,Job,Id:,1 
ValidateXml,05:05.5,|,Thread,18,is,processing,Job,Id:,11 
ValidateXml,05:05.6,|,Thread,20,is,processing,Job,Id:,12 
ProcessJob,05:05.6,|,Thread,23,is,processing,Job,Id:,3 
ValidateXml,05:06.5,|,Thread,6,is,processing,Job,Id:,13 
ValidateXml,05:06.5,|,Thread,21,is,processing,Job,Id:,15 
ID,1,was,successfully,imported.,,,,, 
ValidateXml,05:06.5,|,Thread,16,is,processing,Job,Id:,14 
ValidateXml,05:06.8,|,Thread,15,is,processing,Job,Id:,17 
ProcessJob,05:06.8,|,Thread,24,is,processing,Job,Id:,4 
ValidateXml,05:06.8,|,Thread,11,is,processing,Job,Id:,16 
ProcessJob,05:06.8,|,Thread,22,is,processing,Job,Id:,5 
ProcessJob,05:07.5,|,Thread,17,is,processing,Job,Id:,6 
ProcessJob,05:07.5,|,Thread,25,is,processing,Job,Id:,8 
ValidateXml,05:07.5,|,Thread,19,is,processing,Job,Id:,18 
ProcessJob,05:07.5,|,Thread,14,is,processing,Job,Id:,7 
ValidateXml,05:08.5,|,Thread,16,is,processing,Job,Id:,19 
ProcessJob,05:08.5,|,Thread,23,is,processing,Job,Id:,9 
ValidateXml,05:08.5,|,Thread,18,is,processing,Job,Id:,20 
ProcessJob,05:09.5,|,Thread,19,is,processing,Job,Id:,10 
ID,2,was,successfully,imported.,,,,, 
ProcessJob,05:09.5,|,Thread,15,is,processing,Job,Id:,11 
ID,3,was,successfully,imported.,,,,, 
ProcessJob,05:10.6,|,Thread,14,is,processing,Job,Id:,12 
ProcessJob,05:10.9,|,Thread,25,is,processing,Job,Id:,13 
ProcessJob,05:11.0,|,Thread,24,is,processing,Job,Id:,14 
ID,4,was,successfully,imported.,,,,, 
ProcessJob,05:11.1,|,Thread,17,is,processing,Job,Id:,15 
ProcessJob,05:11.3,|,Thread,22,is,processing,Job,Id:,16 
ID,5,was,successfully,imported.,,,,, 
ID,6,was,successfully,imported.,,,,, 
ID,7,was,successfully,imported.,,,,, 
ID,8,was,successfully,imported.,,,,, 
ProcessJob,05:11.6,|,Thread,19,is,processing,Job,Id:,17 
ProcessJob,05:11.7,|,Thread,23,is,processing,Job,Id:,18 
ID,9,was,successfully,imported.,,,,, 
ID,10,was,successfully,imported.,,,,, 
ProcessJob,05:12.0,|,Thread,14,is,processing,Job,Id:,19 
ProcessJob,05:12.4,|,Thread,15,is,processing,Job,Id:,20 
ID,11,was,successfully,imported.,,,,, 
ID,12,was,successfully,imported.,,,,, 
ID,13,was,successfully,imported.,,,,, 
ID,14,was,successfully,imported.,,,,, 
ID,15,was,successfully,imported.,,,,, 
ID,16,was,successfully,imported.,,,,, 
ID,17,was,successfully,imported.,,,,, 
ID,18,was,successfully,imported.,,,,, 
ID,19,was,successfully,imported.,,,,, 
ID,20,was,successfully,imported.,,,,, 

Одно последнее замечание: функции, возвращающие bool исключения успеха и картографической вернуть коды могут быть проблемное, то есть однако вне сферы охвата этого вопроса. Вы можете получить много хороших советы по передовой практике, разместив код на Stack Exchange Code Review

+1

Я пробовал это, но на самом деле он не работает (т.результаты не находятся в правильном порядке), если вы увеличиваете 'BoundedCapacity' до, скажем, 8. Кажется, что подходит для меньших значений' BoundedCapacity', но факт, что он не работает должным образом для более высоких значений, кажется, указывает, что 'EnsureOrdered 'не делает точно, что мы думаем, что это делает ... –

+1

@Matthew Watson Отличное место, при дальнейшем осмотре появляется EnsureOrdered имеет значение только между блоками. Последний ActionBlock или любой ActionBlock игнорирует этот параметр. Несколько заметок: я получил неупорядоченное поведение для небольших значений BoundedCapacity, вставив задержку в блок действий. Таким образом, хотя сообщения оставляют TransformBlock в правильном порядке, они могут обрабатываться ActionBlock в любом порядке. Может быть, мы могли бы просто открыть внутренний ReorderingBuffer на tBlock, и мы все установили;) – JSteward

+0

Спасибо @JSteward. Попытка понять прямо сейчас. 3 вопроса. Добавляя поток данных TPL от менеджера nuget, я должен выбрать «System.Threading.Tasks.Dataflow» вместо «Microsoft.Tpl.Dataflow», который рекомендуется использовать в MSDN, почему? Кроме того, у моего конвейера должно быть 3 таких этапа, как я могу связать их? Итак, когда первый блок выполняется с первым файлом, он начинает переносить данные в следующий блок, который будет работать параллельно и снова асинхронно, при чтении других файлов? Одна вещь, функция «Подписаться» принимает только один аргумент в соответствии с моим VS? – Peter

 Смежные вопросы

  • Нет связанных вопросов^_^