1

Я пытаюсь создать службу, которая предоставляет очередь для многих асинхронных клиентов для выполнения запросов и ожидания ответа. Мне нужно уметь обрабатывать обработку очереди по X-запросам на Y-длительность. Например: 50 запросов в секунду. Это для сторонней службы REST, где я могу только выдавать X-запросы в секунду.Async Producer/Consumer с дроссельной продолжительностью и потребляемым количеством пакетов

Нашел много вопросов, это привело меня к пути использования потока данных TPL, я использовал TranformBlock для предоставления моего настраиваемого дросселирования, а затем X-номер ActionBlocks для выполнения задач параллельно. Реализация Действия кажется немного неуклюжей, поэтому задаюсь вопросом, есть ли лучший способ для меня передать Задачи в конвейер, который уведомляет вызывающих абонентов после их завершения.

Мне интересно, есть ли лучший или более оптимальный/более простой способ делать то, что я хочу? Есть ли вопиющие проблемы с моей реализацией? Я знаю, что у него нет отмены и обработки исключений, и я буду делать это дальше, но ваши комментарии наиболее приветствуются.

У меня есть Extended Stephen Cleary's example for my Dataflow pipeline и использовано
svick's concept of a time throttled TransformBlock. Мне интересно, можно ли легко создать то, что я построил, с чистым SemaphoreSlim design, его временным дросселированием с максимальными операциями, которые, я думаю, будут усложнять ситуацию.

Вот последняя реализация. FIFO queue async queue, где я могу передавать пользовательские действия.

public class ThrottledProducerConsumer<T> 
{ 
    private class TimerState<T1> 
    { 
     public SemaphoreSlim Sem; 
     public T1 Value; 
    } 

    private BufferBlock<T> _queue; 
    private IPropagatorBlock<T, T> _throttleBlock; 
    private List<Task> _consumers; 

    private static IPropagatorBlock<T1, T1> CreateThrottleBlock<T1>(TimeSpan Interval, Int32 MaxPerInterval) 
    { 
     SemaphoreSlim _sem = new SemaphoreSlim(MaxPerInterval); 
     return new TransformBlock<T1, T1>(async (x) => 
     { 
      var sw = new Stopwatch(); 
      sw.Start(); 
      //Console.WriteLine($"Current count: {_sem.CurrentCount}"); 
      await _sem.WaitAsync(); 

      sw.Stop(); 
      var now = DateTime.UtcNow; 
      var releaseTime = now.Add(Interval) - now; 

      //-- Using timer as opposed to Task.Delay as I do not want to await or wait for it to complete 
      var tm = new Timer((s) => { 
       var state = (TimerState<T1>)s; 
       //Console.WriteLine($"RELEASE: {state.Value} was released {DateTime.UtcNow:mm:ss:ff} Reset Sem"); 
       state.Sem.Release(); 

      }, new TimerState<T1> { Sem = _sem, Value = x }, (int)Interval.TotalMilliseconds, 
      -1); 

      /* 
      Task.Delay(delay).ContinueWith((t)=> 
      { 
       Console.WriteLine($"RELEASE(FAKE): {x} was released {DateTime.UtcNow:mm:ss:ff} Reset Sem"); 
       //_sem.Release(); 
      }); 
      */ 

      //Console.WriteLine($"{x} was tramsformed in {sw.ElapsedMilliseconds}ms. Will release {now.Add(Interval):mm:ss:ff}"); 
      return x; 
     }, 
      //new ExecutionDataflowBlockOptions { BoundedCapacity = 1 }); 
      // 
      new ExecutionDataflowBlockOptions { BoundedCapacity = 5, MaxDegreeOfParallelism = 10 }); 
    } 

    public ThrottledProducerConsumer(TimeSpan Interval, int MaxPerInterval, Int32 QueueBoundedMax = 5, Action<T> ConsumerAction = null, Int32 MaxConsumers = 1) 
    { 
     var consumerOptions = new ExecutionDataflowBlockOptions { BoundedCapacity = 1, }; 
     var linkOptions = new DataflowLinkOptions { PropagateCompletion = true, }; 

     //-- Create the Queue 
     _queue = new BufferBlock<T>(new DataflowBlockOptions { BoundedCapacity = QueueBoundedMax, }); 

     //-- Create and link the throttle block 
     _throttleBlock = CreateThrottleBlock<T>(Interval, MaxPerInterval); 
     _queue.LinkTo(_throttleBlock, linkOptions); 

     //-- Create and link the consumer(s) to the throttle block 
     var consumerAction = (ConsumerAction != null) ? ConsumerAction : new Action<T>(ConsumeItem); 
     _consumers = new List<Task>(); 
     for (int i = 0; i < MaxConsumers; i++) 
     { 
      var consumer = new ActionBlock<T>(consumerAction, consumerOptions); 
      _throttleBlock.LinkTo(consumer, linkOptions); 
      _consumers.Add(consumer.Completion); 
     } 

     //-- TODO: Add some cancellation tokens to shut this thing down 
    } 

    /// <summary> 
    /// Default Consumer Action, just prints to console 
    /// </summary> 
    /// <param name="ItemToConsume"></param> 
    private void ConsumeItem(T ItemToConsume) 
    { 
     Console.WriteLine($"Consumed {ItemToConsume} at {DateTime.UtcNow}"); 
    } 

    public async Task EnqueueAsync(T ItemToEnqueue) 
    { 
     await this._queue.SendAsync(ItemToEnqueue); 
    } 

    public async Task EnqueueItemsAsync(IEnumerable<T> ItemsToEnqueue) 
    { 
     foreach (var item in ItemsToEnqueue) 
     { 
      await this._queue.SendAsync(item); 
     } 
    } 

    public async Task CompleteAsync() 
    { 
     this._queue.Complete(); 
     await Task.WhenAll(_consumers); 
     Console.WriteLine($"All consumers completed {DateTime.UtcNow}"); 
    } 
} 

Метод испытания

public class WorkItem<T> 
    { 
     public TaskCompletionSource<T> tcs; 
     //public T respone; 
     public string url; 
     public WorkItem(string Url) 
     { 
      tcs = new TaskCompletionSource<T>(); 
      url = Url; 
     } 
     public override string ToString() 
     { 
      return $"{url}"; 
     } 
    } 

    public static void TestQueue() 
    { 
     Console.WriteLine("Created the queue"); 

     var defaultAction = new Action<WorkItem<String>>(async i => { 
      var taskItem = ((WorkItem<String>)i); 
      Console.WriteLine($"Consuming: {taskItem.url} {DateTime.UtcNow:mm:ss:ff}"); 
      //-- Assume calling another async method e.g. await httpClient.DownloadStringTaskAsync(url); 
      await Task.Delay(5000); 
      taskItem.tcs.SetResult($"{taskItem.url}"); 
      //Console.WriteLine($"Consumed: {taskItem.url} {DateTime.UtcNow}"); 
     }); 

     var queue = new ThrottledProducerConsumer<WorkItem<String>>(TimeSpan.FromMilliseconds(2000), 5, 2, defaultAction); 

     var results = new List<Task>(); 
     foreach (var no in Enumerable.Range(0, 20)) 
     { 
      var workItem = new WorkItem<String>($"http://someurl{no}.com"); 
      results.Add(queue.EnqueueAsync(workItem)); 
      results.Add(workItem.tcs.Task); 
      results.Add(workItem.tcs.Task.ContinueWith(response => 
      { 
       Console.WriteLine($"Received: {response.Result} {DateTime.UtcNow:mm:ss:ff}"); 
      })); 
     } 

     Task.WhenAll(results).Wait(); 
     Console.WriteLine("All Work Items Have Been Processed"); 
    } 
+1

Некоторые мысли: 1) Вы находитесь на C# и указываете веб-запросы; IIS очень многое делает это уже (включая дросселирование, хотя, по общему признанию, на стороне ввода, а не на выходе). 2) Возможно, что-то такое же просто, как ConcurrentQueue, это все, что вам нужно для обработки параллелизма, с помощью SemaphoreSlim для дросселирования. 3) Если вы в конечном итоге собираетесь масштабироваться на этом, в частности, на несколько запросов на обслуживание машин, шина обслуживания может быть лучшим вариантом? Однако дросселирование было бы более сложным. – sellotape

+0

Почему бы не добавить еще один блок в конце очереди, который уведомит вызывающего абонента об обработанном запросе? – VMAtm

+0

@VMAtm Я предполагаю, что это способ, однако я думал о передаче делегата действия или даже задачи в очередь, которая будет уведомлять вызывающего абонента по завершении. – Nicholas

ответ

1

С просьбой, я создал класс ThrottledConsumerProducer, основанный на TPL DataFlow. Он тестировался в течение нескольких дней, включая конкурирующих производителей, которые были поставлены в очередь и завершены по порядку, около 281 тыс. Без каких-либо проблем, однако там я ошибся, которых я не обнаружил.

  1. Я использую BufferBlock как асинхронной очередь, это связано с:
  2. A TransformaBlock, который обеспечивает дросселирование и блокирования мне нужно. Он используется совместно с SempahoreSlim для управления максимальными запросами. Поскольку каждый элемент передается через блок, он увеличивает семафор и назначает задачу для продолжения X продолжительности, чтобы освободить семафор на единицу. Таким образом, у меня есть скользящее окно из X запросов на продолжительность; именно то, что я хотел. Из-за TPL я также подключаю параллелизм к подключенному:
  3. ActionBlock (s), которые отвечают за выполнение требуемой задачи.

Классы являются общими, поэтому могут быть полезны для других, если им нужно что-то подобное. Я не писал об аннулировании или обработке ошибок, но думал, что я должен просто отметить это как ответ, чтобы переместить его. Я был бы очень рад увидеть некоторые альтернативы и отзывы, а не отметить мой как принятый ответ. Спасибо за прочтение.

Примечание: я удалил таймер от первоначальной реализации, как это делает странные вещи вызывая семафор, чтобы освободить больше, чем максимум, я предполагаю, что это динамическая ошибку контекста, это произошло, когда я начал работать одновременные запросы. Я работал над ним, используя Task.Delay, чтобы запланировать выпуск блокировки семафора.

Throttled Производитель Потребитель

public class ThrottledProducerConsumer<T> 
{ 
    private BufferBlock<T> _queue; 
    private IPropagatorBlock<T, T> _throttleBlock; 
    private List<Task> _consumers; 

    private static IPropagatorBlock<T1, T1> CreateThrottleBlock<T1>(TimeSpan Interval, 
     Int32 MaxPerInterval, Int32 BlockBoundedMax = 2, Int32 BlockMaxDegreeOfParallelism = 2) 
    { 
     SemaphoreSlim _sem = new SemaphoreSlim(MaxPerInterval, MaxPerInterval); 
     return new TransformBlock<T1, T1>(async (x) => 
     { 
      //Log($"Transform blk: {x} {DateTime.UtcNow:mm:ss:ff} Semaphore Count: {_sem.CurrentCount}"); 
      var sw = new Stopwatch(); 
      sw.Start(); 
      //Console.WriteLine($"Current count: {_sem.CurrentCount}"); 
      await _sem.WaitAsync(); 

      sw.Stop(); 
      var delayTask = Task.Delay(Interval).ContinueWith((t) => 
      { 
       //Log($"Pre-RELEASE: {x} {DateTime.UtcNow:mm:ss:ff} Semaphore Count {_sem.CurrentCount}"); 
       _sem.Release(); 
       //Log($"PostRELEASE: {x} {DateTime.UtcNow:mm:ss:ff} Semaphoere Count {_sem.CurrentCount}"); 
      }); 
      //},TaskScheduler.FromCurrentSynchronizationContext());     
      //Log($"Transformed: {x} in queue {sw.ElapsedMilliseconds}ms. {DateTime.Now:mm:ss:ff} will release {DateTime.Now.Add(Interval):mm:ss:ff} Semaphoere Count {_sem.CurrentCount}"); 
      return x; 
     }, 
      //-- Might be better to keep Bounded Capacity in sync with the semaphore 
      new ExecutionDataflowBlockOptions { BoundedCapacity = BlockBoundedMax, 
       MaxDegreeOfParallelism = BlockMaxDegreeOfParallelism }); 
    } 

    public ThrottledProducerConsumer(TimeSpan Interval, int MaxPerInterval, 
     Int32 QueueBoundedMax = 5, Action<T> ConsumerAction = null, Int32 MaxConsumers = 1, 
     Int32 MaxThrottleBuffer = 20, Int32 MaxDegreeOfParallelism = 10) 
    { 
     //-- Probably best to link MaxPerInterval and MaxThrottleBuffer 
     // and MaxConsumers with MaxDegreeOfParallelism 
     var consumerOptions = new ExecutionDataflowBlockOptions { BoundedCapacity = 1, }; 
     var linkOptions = new DataflowLinkOptions { PropagateCompletion = true, }; 

     //-- Create the Queue 
     _queue = new BufferBlock<T>(new DataflowBlockOptions { BoundedCapacity = QueueBoundedMax, }); 

     //-- Create and link the throttle block 
     _throttleBlock = CreateThrottleBlock<T>(Interval, MaxPerInterval); 
     _queue.LinkTo(_throttleBlock, linkOptions); 

     //-- Create and link the consumer(s) to the throttle block 
     var consumerAction = (ConsumerAction != null) ? ConsumerAction : new Action<T>(ConsumeItem); 
     _consumers = new List<Task>(); 
     for (int i = 0; i < MaxConsumers; i++) 
     { 
      var consumer = new ActionBlock<T>(consumerAction, consumerOptions); 
      _throttleBlock.LinkTo(consumer, linkOptions); 
      _consumers.Add(consumer.Completion); 
     } 

     //-- TODO: Add some cancellation tokens to shut this thing down 
    } 

    /// <summary> 
    /// Default Consumer Action, just prints to console 
    /// </summary> 
    /// <param name="ItemToConsume"></param> 
    private void ConsumeItem(T ItemToConsume) 
    { 
     Log($"Consumed {ItemToConsume} at {DateTime.UtcNow}"); 
    } 

    public async Task EnqueueAsync(T ItemToEnqueue) 
    { 
     await this._queue.SendAsync(ItemToEnqueue); 
    } 

    public async Task EnqueueItemsAsync(IEnumerable<T> ItemsToEnqueue) 
    { 
     foreach (var item in ItemsToEnqueue) 
     { 
      await this._queue.SendAsync(item); 
     } 
    } 

    public async Task CompleteAsync() 
    { 
     this._queue.Complete(); 
     await Task.WhenAll(_consumers); 
     Console.WriteLine($"All consumers completed {DateTime.UtcNow}"); 
    } 
    private static void Log(String messageToLog) 
    { 
     System.Diagnostics.Trace.WriteLine(messageToLog); 
     Console.WriteLine(messageToLog); 
    } 

} 

- Пример -

для типичного WorkItem

public class WorkItem<Toutput,Tinput> 
{ 
    private TaskCompletionSource<Toutput> _tcs; 
    public Task<Toutput> Task { get { return _tcs.Task; } } 

    public Tinput InputData { get; private set; } 
    public Toutput OutputData { get; private set; } 

    public WorkItem(Tinput inputData) 
    { 
     _tcs = new TaskCompletionSource<Toutput>(); 
     InputData = inputData; 
    } 

    public void Complete(Toutput result) 
    { 
     _tcs.SetResult(result); 
    } 

    public void Failed(Exception ex) 
    { 
     _tcs.SetException(ex); 
    } 

    public override string ToString() 
    { 
     return InputData.ToString(); 
    } 
} 

Создание блока действий выполняется в трубопроводе

private Action<WorkItem<Location,PointToLocation>> CreateProcessingAction() 
    { 
     return new Action<WorkItem<Location,PointToLocation>>(async i => { 
      var sw = new Stopwatch(); 
      sw.Start(); 

      var taskItem = ((WorkItem<Location,PointToLocation>)i); 
      var inputData = taskItem.InputData; 

      //Log($"Consuming: {inputData.Latitude},{inputData.Longitude} {DateTime.UtcNow:mm:ss:ff}"); 

      //-- Assume calling another async method e.g. await httpClient.DownloadStringTaskAsync(url); 
      await Task.Delay(500); 
      sw.Stop(); 
      Location outData = new Location() 
      { 
       Latitude = inputData.Latitude, 
       Longitude = inputData.Longitude, 
       StreetAddress = $"Consumed: {inputData.Latitude},{inputData.Longitude} Duration(ms): {sw.ElapsedMilliseconds}" 
      }; 
      taskItem.Complete(outData); 
      //Console.WriteLine($"Consumed: {taskItem.url} {DateTime.UtcNow}"); 
     }); 

    } 

Метод испытания Вы должны будете предоставить свою собственную реализацию для PointToLocation и местоположение. Просто пример того, как вы будете использовать его со своими собственными классами.

int startRange = 0; 
    int nextRange = 1000; 
    ThrottledProducerConsumer<WorkItem<Location,PointToLocation>> tpc; 
    private void cmdTestPipeline_Click(object sender, EventArgs e) 
    { 
     Log($"Pipeline test started {DateTime.Now:HH:mm:ss:ff}"); 

     if(tpc == null) 
     { 
      tpc = new ThrottledProducerConsumer<WorkItem<Location, PointToLocation>>(
       //1010, 2, 20000, 
       TimeSpan.FromMilliseconds(1010), 45, 100000, 
       CreateProcessingAction(), 
       2,45,10); 
     } 

     var workItems = new List<WorkItem<Models.Location, PointToLocation>>(); 
     foreach (var i in Enumerable.Range(startRange, nextRange)) 
     { 
      var ptToLoc = new PointToLocation() { Latitude = i + 101, Longitude = i + 100 }; 
      var wrkItem = new WorkItem<Location, PointToLocation>(ptToLoc); 
      workItems.Add(wrkItem); 


      wrkItem.Task.ContinueWith(t => 
      { 
       var loc = t.Result; 
       string line = $"[Simulated:{DateTime.Now:HH:mm:ss:ff}] - {loc.StreetAddress}"; 
       //txtResponse.Text = String.Concat(txtResponse.Text, line, System.Environment.NewLine); 
       //var lines = txtResponse.Text.Split(new string[] { System.Environment.NewLine}, 
       // StringSplitOptions.RemoveEmptyEntries).LongCount(); 

       //lblLines.Text = lines.ToString(); 
       //Log(line); 

      }); 
      //}, TaskScheduler.FromCurrentSynchronizationContext()); 

     } 

     startRange += nextRange; 

     tpc.EnqueueItemsAsync(workItems); 

     Log($"Pipeline test completed {DateTime.Now:HH:mm:ss:ff}"); 
    }