Я пытаюсь создать службу, которая предоставляет очередь для многих асинхронных клиентов для выполнения запросов и ожидания ответа. Мне нужно уметь обрабатывать обработку очереди по X-запросам на Y-длительность. Например: 50 запросов в секунду. Это для сторонней службы REST, где я могу только выдавать X-запросы в секунду.Async Producer/Consumer с дроссельной продолжительностью и потребляемым количеством пакетов
Нашел много вопросов, это привело меня к пути использования потока данных TPL, я использовал TranformBlock для предоставления моего настраиваемого дросселирования, а затем X-номер ActionBlocks для выполнения задач параллельно. Реализация Действия кажется немного неуклюжей, поэтому задаюсь вопросом, есть ли лучший способ для меня передать Задачи в конвейер, который уведомляет вызывающих абонентов после их завершения.
Мне интересно, есть ли лучший или более оптимальный/более простой способ делать то, что я хочу? Есть ли вопиющие проблемы с моей реализацией? Я знаю, что у него нет отмены и обработки исключений, и я буду делать это дальше, но ваши комментарии наиболее приветствуются.
У меня есть Extended Stephen Cleary's example for my Dataflow pipeline и использовано
svick's concept of a time throttled TransformBlock. Мне интересно, можно ли легко создать то, что я построил, с чистым SemaphoreSlim design, его временным дросселированием с максимальными операциями, которые, я думаю, будут усложнять ситуацию.
Вот последняя реализация. FIFO queue async queue, где я могу передавать пользовательские действия.
public class ThrottledProducerConsumer<T>
{
private class TimerState<T1>
{
public SemaphoreSlim Sem;
public T1 Value;
}
private BufferBlock<T> _queue;
private IPropagatorBlock<T, T> _throttleBlock;
private List<Task> _consumers;
private static IPropagatorBlock<T1, T1> CreateThrottleBlock<T1>(TimeSpan Interval, Int32 MaxPerInterval)
{
SemaphoreSlim _sem = new SemaphoreSlim(MaxPerInterval);
return new TransformBlock<T1, T1>(async (x) =>
{
var sw = new Stopwatch();
sw.Start();
//Console.WriteLine($"Current count: {_sem.CurrentCount}");
await _sem.WaitAsync();
sw.Stop();
var now = DateTime.UtcNow;
var releaseTime = now.Add(Interval) - now;
//-- Using timer as opposed to Task.Delay as I do not want to await or wait for it to complete
var tm = new Timer((s) => {
var state = (TimerState<T1>)s;
//Console.WriteLine($"RELEASE: {state.Value} was released {DateTime.UtcNow:mm:ss:ff} Reset Sem");
state.Sem.Release();
}, new TimerState<T1> { Sem = _sem, Value = x }, (int)Interval.TotalMilliseconds,
-1);
/*
Task.Delay(delay).ContinueWith((t)=>
{
Console.WriteLine($"RELEASE(FAKE): {x} was released {DateTime.UtcNow:mm:ss:ff} Reset Sem");
//_sem.Release();
});
*/
//Console.WriteLine($"{x} was tramsformed in {sw.ElapsedMilliseconds}ms. Will release {now.Add(Interval):mm:ss:ff}");
return x;
},
//new ExecutionDataflowBlockOptions { BoundedCapacity = 1 });
//
new ExecutionDataflowBlockOptions { BoundedCapacity = 5, MaxDegreeOfParallelism = 10 });
}
public ThrottledProducerConsumer(TimeSpan Interval, int MaxPerInterval, Int32 QueueBoundedMax = 5, Action<T> ConsumerAction = null, Int32 MaxConsumers = 1)
{
var consumerOptions = new ExecutionDataflowBlockOptions { BoundedCapacity = 1, };
var linkOptions = new DataflowLinkOptions { PropagateCompletion = true, };
//-- Create the Queue
_queue = new BufferBlock<T>(new DataflowBlockOptions { BoundedCapacity = QueueBoundedMax, });
//-- Create and link the throttle block
_throttleBlock = CreateThrottleBlock<T>(Interval, MaxPerInterval);
_queue.LinkTo(_throttleBlock, linkOptions);
//-- Create and link the consumer(s) to the throttle block
var consumerAction = (ConsumerAction != null) ? ConsumerAction : new Action<T>(ConsumeItem);
_consumers = new List<Task>();
for (int i = 0; i < MaxConsumers; i++)
{
var consumer = new ActionBlock<T>(consumerAction, consumerOptions);
_throttleBlock.LinkTo(consumer, linkOptions);
_consumers.Add(consumer.Completion);
}
//-- TODO: Add some cancellation tokens to shut this thing down
}
/// <summary>
/// Default Consumer Action, just prints to console
/// </summary>
/// <param name="ItemToConsume"></param>
private void ConsumeItem(T ItemToConsume)
{
Console.WriteLine($"Consumed {ItemToConsume} at {DateTime.UtcNow}");
}
public async Task EnqueueAsync(T ItemToEnqueue)
{
await this._queue.SendAsync(ItemToEnqueue);
}
public async Task EnqueueItemsAsync(IEnumerable<T> ItemsToEnqueue)
{
foreach (var item in ItemsToEnqueue)
{
await this._queue.SendAsync(item);
}
}
public async Task CompleteAsync()
{
this._queue.Complete();
await Task.WhenAll(_consumers);
Console.WriteLine($"All consumers completed {DateTime.UtcNow}");
}
}
Метод испытания
public class WorkItem<T>
{
public TaskCompletionSource<T> tcs;
//public T respone;
public string url;
public WorkItem(string Url)
{
tcs = new TaskCompletionSource<T>();
url = Url;
}
public override string ToString()
{
return $"{url}";
}
}
public static void TestQueue()
{
Console.WriteLine("Created the queue");
var defaultAction = new Action<WorkItem<String>>(async i => {
var taskItem = ((WorkItem<String>)i);
Console.WriteLine($"Consuming: {taskItem.url} {DateTime.UtcNow:mm:ss:ff}");
//-- Assume calling another async method e.g. await httpClient.DownloadStringTaskAsync(url);
await Task.Delay(5000);
taskItem.tcs.SetResult($"{taskItem.url}");
//Console.WriteLine($"Consumed: {taskItem.url} {DateTime.UtcNow}");
});
var queue = new ThrottledProducerConsumer<WorkItem<String>>(TimeSpan.FromMilliseconds(2000), 5, 2, defaultAction);
var results = new List<Task>();
foreach (var no in Enumerable.Range(0, 20))
{
var workItem = new WorkItem<String>($"http://someurl{no}.com");
results.Add(queue.EnqueueAsync(workItem));
results.Add(workItem.tcs.Task);
results.Add(workItem.tcs.Task.ContinueWith(response =>
{
Console.WriteLine($"Received: {response.Result} {DateTime.UtcNow:mm:ss:ff}");
}));
}
Task.WhenAll(results).Wait();
Console.WriteLine("All Work Items Have Been Processed");
}
Некоторые мысли: 1) Вы находитесь на C# и указываете веб-запросы; IIS очень многое делает это уже (включая дросселирование, хотя, по общему признанию, на стороне ввода, а не на выходе). 2) Возможно, что-то такое же просто, как ConcurrentQueue, это все, что вам нужно для обработки параллелизма, с помощью SemaphoreSlim для дросселирования. 3) Если вы в конечном итоге собираетесь масштабироваться на этом, в частности, на несколько запросов на обслуживание машин, шина обслуживания может быть лучшим вариантом? Однако дросселирование было бы более сложным. – sellotape
Почему бы не добавить еще один блок в конце очереди, который уведомит вызывающего абонента об обработанном запросе? – VMAtm
@VMAtm Я предполагаю, что это способ, однако я думал о передаче делегата действия или даже задачи в очередь, которая будет уведомлять вызывающего абонента по завершении. – Nicholas