Если вам нужно, чтобы конвейер оставался на всю жизнь приложения, а не только запрос, вы могли бы использовать статический класс для его хранения. В блоке действий необязательно требовать полного завершения. Другой вариант, зависящий от ваших потребностей, - это отделить приложение и конвейер обработки. Они могут быть отделены очередью сообщений базы данных или просто отдельными приложениями на стороне сервера.
@svick имеет хорошую точку, используя источник TaskCompletion, чтобы определить, когда конвейер завершил работу с определенным элементом. Собираем все вместе здесь быстрый образец, который может быть полезным:
public class Controller {
public async Task<int> PostToPipeline(int inputValue) {
var message = new MessageIn(inputValue);
MyPipeline.InputBuffer.Post(message);
return await message.Completion.Task;
}
}
public class MessageIn {
public MessageIn(int value) {
InputValue = value;
Completion = new TaskCompletionSource<int>();
}
public int InputValue { get; set; }
public TaskCompletionSource<int> Completion { get; set; }
}
public class MessageProcessed {
public int ProcessedValue { get; set; }
public TaskCompletionSource<int> Completion { get; set; }
}
public static class MyPipeline {
public static BufferBlock<MessageIn> InputBuffer { get; private set; }
private static TransformBlock<MessageIn, MessageProcessed> transform;
private static ActionBlock<MessageProcessed> action;
static MyPipeline() {
BuildPipeline();
LinkPipeline();
}
static void BuildPipeline() {
InputBuffer = new BufferBlock<MessageIn>();
transform = new TransformBlock<MessageIn, MessageProcessed>((Func<MessageIn, MessageProcessed>)TransformMessage, new ExecutionDataflowBlockOptions() {
MaxDegreeOfParallelism = Environment.ProcessorCount,
BoundedCapacity = 10
});
action = new ActionBlock<MessageProcessed>((Action<MessageProcessed>)CompletedProcessing, new ExecutionDataflowBlockOptions() {
MaxDegreeOfParallelism = Environment.ProcessorCount,
BoundedCapacity = 10
});
}
static void LinkPipeline() {
InputBuffer.LinkTo(transform, new DataflowLinkOptions() { PropagateCompletion = true });
transform.LinkTo(action, new DataflowLinkOptions() { PropagateCompletion = true });
}
static MessageProcessed TransformMessage(MessageIn message) {
return new MessageProcessed() {
ProcessedValue = message.InputValue++,
Completion = message.Completion
};
}
static void CompletedProcessing(MessageProcessed message) {
message.Completion.SetResult(message.ProcessedValue);
}
}
Там несколько способов координатные завершения конкретной работы в трубопроводе; ожидание источника завершения может быть лучшим подходом для ваших нужд.
Да, я мог бы SendAsync/Опубликовать в блок и вернуться, не дожидаясь завершения, но это вызывает проблемы с дизайном, когда я действительно хочу знать «возвращенный объект». Так что в идеале я хотел бы иметь асинхронное завершение, не закрывая весь канал, он должен ждать вечно за доход без повторной инициализации. – mike00
Отлично! Красивое решение моей проблемы. Большое спасибо! – mike00