2013-09-17 2 views
8

После игры с потоком данных я столкнулся с новой проблемой. Я хотел бы ограничить входной уровень всех блоков. Мой productionblock (ActionBlock) быстро создает 5000 элементов и помещает их в широковещательный блок. Поэтому, если я установил BoundedCapacity в широковещательном блоке на 100, он выбросит много данных. Но я бы предпочел, чтобы производственный блок дождался новых слотов во входном буфере моего буфера.поток данных tpl: фиксированный размер буфера без отбрасывания элементов

Есть ли способ избавиться от этой проблемы?

ответ

7

Это точно, что BufferBlock для. Если вы установите его BoundedCapacity и он будет заполнен, он отложит получение любых сообщений, пока кто-то их не будет использовать. Это означает, что, например, Post() будет заблокирован, а SendAsync() вернет незавершенный Task.

EDIT: Нет встроенного блока, который отправляет несколько целей и никогда не удаляет данные. Но вы можете легко построить один из себя ActionBlock и представляемого цикла:

static ITargetBlock<T> CreateMultipleTargetsBlock<T>(
    IEnumerable<ITargetBlock<T>> targets, int boundedCapacity) 
{ 
    var targetsList = targets.ToList(); 

    var block = new ActionBlock<T>(
     async item => 
     { 
      foreach (var target in targetsList) 
      { 
       await target.SendAsync(item); 
      } 
     }, 
     new ExecutionDataflowBlockOptions { BoundedCapacity = boundedCapacity }); 

    // TODO: propagate completion from block to targets 

    return block; 
} 

Этот код предполагает, что вам не нужно клонировать данные для каждой цели и что список целей никогда не меняется. Изменение кода для этого должно быть довольно простым.

+0

Прежде всего - спасибо. Я был абсолютно слеп .... Но мне нужен блок, который может отправлять сообщения нескольким ресиверам. Это не возможно с помощью буферного блока. Любая идея, как это решить? – Daffi

+0

Что должно случиться, когда один из приемников работает медленно? Должны ли другие приемники ждать его? (Я предполагаю, что у приемников также есть параметр «BoundedCapacity», иначе установка BoundedCapacity в буфере в основном не будет иметь эффекта.) – svick

+0

yes, BoundedCapacity всегда имеет одно значение. Также верно, что другим приемникам нужно ждать медленной работы. – Daffi