2016-11-27 3 views
3

У меня возникла проблема с определением того, как обнаружить завершение в потоке потока данных TCL.Завершение в циклах потока данных TPL

У меня есть цикл обратной связи в части потока данных, который делает запросы GET на удаленный сервер и обрабатывает ответы данных (преобразуя их с большим количеством потоков данных, а затем фиксируя результаты).

Источник данных разбивает результаты на страницы 1000 записей и не укажет, сколько у него страниц доступно. Я должен просто читать, пока не получу полную страницу данных.

Обычно количество страниц 1, часто это до 10, каждый раз и снова у нас есть 1000.

У меня есть много запросов на получение в начале.
Я хочу иметь возможность использовать пул потоков, чтобы справиться с этим, все это прекрасно, я могу ставить в очередь несколько запросов на данные и запрашивать их одновременно. Если я наткнулся на экземпляр, где мне нужно получить большое количество страниц, я хочу использовать все свои потоки для этого. Я не хочу, чтобы меня оставила одна нить, отбитая, пока остальные закончили.

Проблема у меня есть, когда я бросаю эту логику в поток данных, таких как:

//generate initial requests for activity 
var request = new TransformManyBlock<int, DataRequest>(cmp => QueueRequests(cmp)); 

//fetch the initial requests and feedback more requests to our input buffer if we need to 
TransformBlock<DataRequest, DataResponse> fetch = null; 
fetch = new TransformBlock<DataRequest, DataResponse>(async req => 
{ 
    var resp = await Fetch(req); 

    if (resp.Results.Count == 1000) 
     await fetch.SendAsync(QueueAnotherRequest(req)); 

    return resp; 
} 
, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 10 }); 

//commit each type of request 
var commit = new ActionBlock<DataResponse>(async resp => await Commit(resp)); 

request.LinkTo(fetch); 
fetch.LinkTo(commit); 

//when are we complete? 

QueueRequests производит IEnumerable<DataRequest>. Я заказываю очередные запросы N-страницы сразу, принимая это, что я посылаю несколько больше вызовов, чем мне нужно. В экземплярах DataRequest используется счетчик LastPage, чтобы избежать неудобного выполнения запросов, которые мы знаем, после последней страницы. Все это прекрасно.

Проблема:
Если я зациклился, возвращая больше запросов в входной буфер ввода, как я показал в этом примере, у меня возникла проблема с сигналом (или даже обнаружением) завершения. Я не могу установить завершение на выборку из запроса, так как после завершения установки я больше не могу получать обратную связь.

Я могу следить за тем, чтобы буферы ввода и вывода были пустыми при извлечении, но я думаю, что я бы рискнул извлечения, все еще занятой запросом, когда я устанавливал завершение, тем самым предотвращая запросы на очередность для дополнительных страниц.

Я мог бы с некоторой долей вероятности узнать, что выборка занята (либо имеет вход, либо занят обработкой ввода).

У меня отсутствует очевидный/простой способ решить эту проблему?

  • Я мог бы зацикливаться на выбор, а не на очередность запросов. Проблема заключается в том, что я хочу иметь возможность использовать максимальное количество потоков для ограничения того, что я делаю на удаленном сервере. Может ли параллельный цикл внутри блока совместно использовать планировщик с самим блоком, а итоговое количество потоков контролируется через планировщик?

  • Я мог бы создать собственный блок преобразования для извлечения для обработки сигнализации завершения. Кажется, очень много работы для такого простого сценария.

Большое спасибо за предоставленную помощь!

+0

ли вы сейчас тот момент, когда все запросы генерируются в первом блоке? – VMAtm

+0

Да, для начала конвейера я называю 'foreach (var c в todolist) {request.Post (c); }; '. Затем я могу вызвать 'request.Complete();' поскольку я больше не буду добавлять запросы. – ajk

+0

@ajk, если это то, что вы делаете, почему бы вам просто не использовать 'a.LinkTo (b, new DataflowLinkOptions {PropagateCompletion = true})' на всех ваших блочных ссылках? Тогда вызов 'request.Complete()' приведет к тому, что команда commit.Completion перейдет к завершенному состоянию, как только все элементы пройдут через все этапы вашего конвейера, естественно. –

ответ

0

Сейчас я добавил простой занятое состояние счетчика выборки блок: -

int fetch_busy = 0; 

TransformBlock<DataRequest, DataResponse> fetch_activity=null; 
fetch = new TransformBlock<DataRequest, ActivityResponse>(async req => 
    { 
     try 
     { 
      Interlocked.Increment(ref fetch_busy); 
      var resp = await Fetch(req); 

      if (resp.Results.Count == 1000) 
      { 
       await fetch.SendAsync(QueueAnotherRequest(req)); 
      } 

      Interlocked.Decrement(ref fetch_busy); 
      return resp; 
     } 
     catch (Exception ex) 
     { 
      Interlocked.Decrement(ref fetch_busy); 
      throw ex; 
     } 
    } 
    , new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 10 }); 

который я затем использовать для сигнала завершения следующим образом: -

request.Completion.ContinueWith(async _ => 
    { 
     while (fetch.InputCount > 0 || fetch_busy > 0) 
     { 
      await Task.Delay(100); 
     } 

     fetch.Complete(); 
    }); 

Который не кажется очень элегантно , но я должен работать.

+2

Я понимаю, что 'SendAsync' возвращает, как только он проверяет, что следующий блок с радостью примет новый элемент. Таким образом, возможно, что 'await fetch.SendAsync' завершит (после чего будет декремент' fetch_busy') до того, как «внутреннее» преобразование получило возможность снова увеличивать 'fetch_busy'. За это время возможно, что блок 'fetch' будет отмечен как полный вашим продолжением (если' fetch_busy' и 'fetch.InputCount' оба равны нулю). Если внутренняя задача 'Fetch' в полете произвела 1000 элементов и попробовала другой« SendAsync », она будет терпеть неудачу. –

+0

Это, очевидно, довольно надуманный, но не непостижимый сценарий, поэтому, возможно, вам следует бросить, если 'await fetch.SendAsync' возвращает' false'. Также помните: 'ContinueWith' с асинхронной лямбдой в качестве аргумента возвращает' Task '(это может привести к неожиданностям, если вы когда-нибудь решите что-либо сделать с результатом). –

+0

@ KirillShlenskiy спасибо за это, да я буду исследовать. Я попытался смягчить это с помощью проверки 'fetch.InputCount> 0' в' ContinueWith'. Вы хотите сказать, что 'await fetch.SendAsync' может вернуться до того, как новый запрос будет отображаться в' fetch.InputCount'? – ajk

2

В TPL DataFlow, вы можете link the blocks с DataflowLinkOptions с указанием propagation of completion of the block:

request.LinkTo(fetch, new DataflowLinkOptions { PropagateCompletion = true }); 
fetch.LinkTo(commit, new DataflowLinkOptions { PropagateCompletion = true }); 

После этого, вы просто вызвать метод Complete() для request блока, и вы сделали!

// the completion will be propagated to all the blocks 
request.Complete(); 

Последнее, что вы должны использовать это Completion задачу свойства последнего блока:

commit.Completion.ContinueWith(t => 
    { 
     /* check the status of the task and correctness of the requests handling */ 
    }); 
+0

Привет @VMAtm, да, как обсуждалось в комментариях выше, это понятно. Однако, как только завершение будет распространено на Fetch, Fetch больше не может отправлять больше сообщений в свой входной буфер. Fetch передает сообщения назад к себе, если, когда он получает ответ, он обнаруживает, что доступно больше данных. Когда завершение задано при извлечении, этот метод обратной связи больше не разрешен. – ajk

+0

Хорошо, тогда вы просто распространяете завершение с 'fetch' на' commit' и используете цикл 'request.Completion.ContinueWith' для проверки состояния' fetch', как вы в своем ответе – VMAtm

+0

большое спасибо.Я не знал, есть ли лучший способ узнать, что выборка завершена, но я могу жить с этим, если нет! – ajk