Просто уходящий от обзора высокого уровня, я думаю, было бы проще повторить в финале ActionBlock
. Ваш ActionBlock
отправит и подтвердит успех до окончания. В зависимости от объема данных этот подход позволяет вам закрутить столько ActionBlock
s, сколько необходимо, без излишней заботы о потерянных элементах, на самом деле их не должно быть. Если один или несколько экземпляров ActionBlock
не удалили свой пост, ваши объекты будут по-прежнему передаваться и распространяться в отношении того, как вы настроили свои буферы и очереди ввода, ожидая их шанса на проводе.
Редактировать Просто некоторые псевдо-код, но это будет иметь партию точек данных, то IEnumerable, и стараться размещать их в пять раз. Ограниченная емкость заставит каждый экземпляр обработчика поставить в очередь 1000 партий, а параллелизм будет распределять партии между блоками действий. Необязательно, Buffer
может быть добавлен до ActionBlock
для хранения всех входящих партий. Вам нужно будет позаботиться о том, чтобы ваш продюсер, поток, не сильно запустил ваш потребитель, службу REST.
public void ConfigureFinalActionBlock() {
var dataPointBuffer = new BufferBlock<IEnumerable<Datapoint>>(new DataflowBlockOptions() {
BoundedCapacity = DataflowBlockOptions.Unbounded
});
var options = new ExecutionDataflowBlockOptions() {
BoundedCapacity = 1000,
MaxDegreeOfParallelism = Environment.ProcessorCount
};
var restBlock = new ActionBlock<IEnumerable<Datapoint>>(async (data) => {
var success = false;
var attempts = 0;
while (!success && attempts < 5) {
attempts++;
success = await MyApiPostAsync(data);
}
}, options);
dataPointBuffer.LinkTo(restBlock, new DataflowLinkOptions() {
PropagateCompletion = true
});
Возьмем этот случай. Я получил буфер 100000 datapoints, и я заново его запускаю в actionblock. Только в следующую секунду я получу еще 100 000 данных с запросом на отправку actionblock. Поэтому я могу пропустить некоторые моменты. Как установить несколько блоков действий с входными очередями. – Abhay
Как вы можете пропустить какие-либо баллы? Буфер имеет неограниченную очередь, новые сообщения будут либо ждать обработанных, либо, если вы укажете максимальную степень параллелизма, будут выполняться одновременно в другом потоке. – VMAtm