У меня возникла проблема с определением того, как обнаружить завершение в потоке потока данных TCL.Завершение в циклах потока данных TPL
У меня есть цикл обратной связи в части потока данных, который делает запросы GET
на удаленный сервер и обрабатывает ответы данных (преобразуя их с большим количеством потоков данных, а затем фиксируя результаты).
Источник данных разбивает результаты на страницы 1000 записей и не укажет, сколько у него страниц доступно. Я должен просто читать, пока не получу полную страницу данных.
Обычно количество страниц 1, часто это до 10, каждый раз и снова у нас есть 1000.
У меня есть много запросов на получение в начале.
Я хочу иметь возможность использовать пул потоков, чтобы справиться с этим, все это прекрасно, я могу ставить в очередь несколько запросов на данные и запрашивать их одновременно. Если я наткнулся на экземпляр, где мне нужно получить большое количество страниц, я хочу использовать все свои потоки для этого. Я не хочу, чтобы меня оставила одна нить, отбитая, пока остальные закончили.
Проблема у меня есть, когда я бросаю эту логику в поток данных, таких как:
//generate initial requests for activity
var request = new TransformManyBlock<int, DataRequest>(cmp => QueueRequests(cmp));
//fetch the initial requests and feedback more requests to our input buffer if we need to
TransformBlock<DataRequest, DataResponse> fetch = null;
fetch = new TransformBlock<DataRequest, DataResponse>(async req =>
{
var resp = await Fetch(req);
if (resp.Results.Count == 1000)
await fetch.SendAsync(QueueAnotherRequest(req));
return resp;
}
, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 10 });
//commit each type of request
var commit = new ActionBlock<DataResponse>(async resp => await Commit(resp));
request.LinkTo(fetch);
fetch.LinkTo(commit);
//when are we complete?
QueueRequests
производит IEnumerable<DataRequest>
. Я заказываю очередные запросы N-страницы сразу, принимая это, что я посылаю несколько больше вызовов, чем мне нужно. В экземплярах DataRequest используется счетчик LastPage, чтобы избежать неудобного выполнения запросов, которые мы знаем, после последней страницы. Все это прекрасно.
Проблема:
Если я зациклился, возвращая больше запросов в входной буфер ввода, как я показал в этом примере, у меня возникла проблема с сигналом (или даже обнаружением) завершения. Я не могу установить завершение на выборку из запроса, так как после завершения установки я больше не могу получать обратную связь.
Я могу следить за тем, чтобы буферы ввода и вывода были пустыми при извлечении, но я думаю, что я бы рискнул извлечения, все еще занятой запросом, когда я устанавливал завершение, тем самым предотвращая запросы на очередность для дополнительных страниц.
Я мог бы с некоторой долей вероятности узнать, что выборка занята (либо имеет вход, либо занят обработкой ввода).
У меня отсутствует очевидный/простой способ решить эту проблему?
Я мог бы зацикливаться на выбор, а не на очередность запросов. Проблема заключается в том, что я хочу иметь возможность использовать максимальное количество потоков для ограничения того, что я делаю на удаленном сервере. Может ли параллельный цикл внутри блока совместно использовать планировщик с самим блоком, а итоговое количество потоков контролируется через планировщик?
Я мог бы создать собственный блок преобразования для извлечения для обработки сигнализации завершения. Кажется, очень много работы для такого простого сценария.
Большое спасибо за предоставленную помощь!
ли вы сейчас тот момент, когда все запросы генерируются в первом блоке? – VMAtm
Да, для начала конвейера я называю 'foreach (var c в todolist) {request.Post (c); }; '. Затем я могу вызвать 'request.Complete();' поскольку я больше не буду добавлять запросы. – ajk
@ajk, если это то, что вы делаете, почему бы вам просто не использовать 'a.LinkTo (b, new DataflowLinkOptions {PropagateCompletion = true})' на всех ваших блочных ссылках? Тогда вызов 'request.Complete()' приведет к тому, что команда commit.Completion перейдет к завершенному состоянию, как только все элементы пройдут через все этапы вашего конвейера, естественно. –