2017-01-02 9 views
2

Например, у меня есть три блока:TPL Dataflow труба постоянно работает внутри службы

 Buffer -> Transform -> Action 

Я бегу службы WebAPI приносит данные из запросов в блок буфера. Как создать такой канал, который будет работать вечно без вызова Completion() в блоке Action и остановки всего канала.

ответ

3

Если вам нужно, чтобы конвейер оставался на всю жизнь приложения, а не только запрос, вы могли бы использовать статический класс для его хранения. В блоке действий необязательно требовать полного завершения. Другой вариант, зависящий от ваших потребностей, - это отделить приложение и конвейер обработки. Они могут быть отделены очередью сообщений базы данных или просто отдельными приложениями на стороне сервера.

@svick имеет хорошую точку, используя источник TaskCompletion, чтобы определить, когда конвейер завершил работу с определенным элементом. Собираем все вместе здесь быстрый образец, который может быть полезным:

public class Controller { 

    public async Task<int> PostToPipeline(int inputValue) { 
     var message = new MessageIn(inputValue); 
     MyPipeline.InputBuffer.Post(message); 
     return await message.Completion.Task; 
    } 
} 

public class MessageIn { 
    public MessageIn(int value) { 
     InputValue = value; 
     Completion = new TaskCompletionSource<int>(); 
    } 

    public int InputValue { get; set; } 
    public TaskCompletionSource<int> Completion { get; set; } 
} 

public class MessageProcessed { 
    public int ProcessedValue { get; set; } 
    public TaskCompletionSource<int> Completion { get; set; } 
} 

public static class MyPipeline { 

    public static BufferBlock<MessageIn> InputBuffer { get; private set; } 
    private static TransformBlock<MessageIn, MessageProcessed> transform; 
    private static ActionBlock<MessageProcessed> action; 

    static MyPipeline() { 
     BuildPipeline(); 
     LinkPipeline(); 

    } 

    static void BuildPipeline() { 
     InputBuffer = new BufferBlock<MessageIn>(); 

     transform = new TransformBlock<MessageIn, MessageProcessed>((Func<MessageIn, MessageProcessed>)TransformMessage, new ExecutionDataflowBlockOptions() { 
      MaxDegreeOfParallelism = Environment.ProcessorCount, 
      BoundedCapacity = 10 
     }); 

     action = new ActionBlock<MessageProcessed>((Action<MessageProcessed>)CompletedProcessing, new ExecutionDataflowBlockOptions() { 
      MaxDegreeOfParallelism = Environment.ProcessorCount, 
      BoundedCapacity = 10 
     }); 
    } 

    static void LinkPipeline() { 
     InputBuffer.LinkTo(transform, new DataflowLinkOptions() { PropagateCompletion = true }); 
     transform.LinkTo(action, new DataflowLinkOptions() { PropagateCompletion = true }); 
    } 

    static MessageProcessed TransformMessage(MessageIn message) { 
     return new MessageProcessed() { 
      ProcessedValue = message.InputValue++, 
      Completion = message.Completion 
     }; 
    } 

    static void CompletedProcessing(MessageProcessed message) { 
     message.Completion.SetResult(message.ProcessedValue); 
    } 
} 

Там несколько способов координатные завершения конкретной работы в трубопроводе; ожидание источника завершения может быть лучшим подходом для ваших нужд.

+0

Да, я мог бы SendAsync/Опубликовать в блок и вернуться, не дожидаясь завершения, но это вызывает проблемы с дизайном, когда я действительно хочу знать «возвращенный объект». Так что в идеале я хотел бы иметь асинхронное завершение, не закрывая весь канал, он должен ждать вечно за доход без повторной инициализации. – mike00

+0

Отлично! Красивое решение моей проблемы. Большое спасибо! – mike00

4

Dataflow не имеет отличного решения для получения вывода конвейера для определенного ввода (поскольку он поддерживает не только простые конвейеры).

Что вы можете сделать, чтобы обойти это, создать TaskCompletionSource<T> и отправить его вместе с вводом в конвейер. Каждый блок в конвейере отправляет его в следующий блок, а последний вызывает его SetResult().

Код, который отправляет входной сигнал в конвейер, может затем awaitTaskCompletionSourceTask, чтобы дождаться выхода конвейера.

+0

Спасибо @svick за указание на это решение! – mike00