2012-05-24 3 views
6

Я пытаюсь реализовать ConcurrentDictionary, обернув его в BlockingCollection, но, похоже, не был успешным.Как скопировать ConcurrentDictionary в BlockingCollection?

Я понимаю, что одна переменная декларации работать с BlockingCollection, такие как ConcurrentBag<T>, ConcurrentQueue<T> и т.д.

Таким образом, чтобы создать ConcurrentBag завернутое в BlockingCollection я бы объявить и создать экземпляр так:

BlockingCollection<int> bag = new BlockingCollection<int>(new ConcurrentBag<int>()); 

но как это сделать для ConcurrentDictionary? Мне нужна блокирующая функциональность BlockingCollection как на стороне производителя, так и на потребителя.

+0

Словарь (и ConcurrentDictionary тоже) не сохраняет порядок элементов. Можете ли вы описать свой сценарий производителя-потребителя? – Dennis

+0

@ Dennis, я это знаю. Производитель хранит KeyValuePairs в concurrentDictionary, а потребительская задача увеличивает int и удаляет KeyValuePair, если int совпадает с соответствующим ключом. Я делаю это, потому что рабочие задачи заполняют concurrentDictionary со значениями, но в произвольном порядке, потребительская задача гарантирует, что полученные значения передаются/обрабатываются в правильном порядке. Можно ли заключить ConcurrentDictionary в BlockingCollection? –

+0

Какое решение вы придумали? Я пытаюсь найти хорошее решение аналогичной проблемы, когда производитель не производит товары в порядке, необходимом для потребителя. (старый пост я знаю, но стоит попробовать) – Kim

ответ

1

Вам нужно будет написать свой собственный класс адаптера - что-то вроде:

public class ConcurrentDictionaryWrapper<TKey,TValue> : IProducerConsumerCollection<KeyValuePair<TKey,TValue>> 
{ 
    private ConcurrentDictionary<TKey, TValue> dictionary; 

    public IEnumerator<KeyValuePair<TKey, TValue>> GetEnumerator() 
    { 
     return dictionary.GetEnumerator(); 
    } 

    IEnumerator IEnumerable.GetEnumerator() 
    { 
     return GetEnumerator(); 
    } 

    public void CopyTo(Array array, int index) 
    { 
     throw new NotImplementedException(); 
    } 

    public int Count 
    { 
     get { return dictionary.Count; } 
    } 

    public object SyncRoot 
    { 
     get { return this; } 
    } 

    public bool IsSynchronized 
    { 
     get { return true; } 
    } 

    public void CopyTo(KeyValuePair<TKey, TValue>[] array, int index) 
    { 
     throw new NotImplementedException(); 
    } 

    public bool TryAdd(KeyValuePair<TKey, TValue> item) 
    { 
     return dictionary.TryAdd(item.Key, item.Value); 
    } 

    public bool TryTake(out KeyValuePair<TKey, TValue> item) 
    { 
     item = dictionary.FirstOrDefault(); 
     TValue value; 
     return dictionary.TryRemove(item.Key, out value); 
    } 

    public KeyValuePair<TKey, TValue>[] ToArray() 
    { 
     throw new NotImplementedException(); 
    } 
} 
+1

Спасибо за предложение кода. Но моей основной целью использования BlockingCollection была возможность пометить коллекцию как Adding Completed и проверить ее статус, а также добавить ее полную и пустую, аналогичную тому, что предоставляет BlockingCollection. Я знаю, что я могу легко добавить такую ​​функциональность, но я ищу предложение, как это сделать непосредственно через BlockingCollection. Пока я не вижу причин, по которым он не может работать через сборку блокировки напрямую. Может быть, только IProducerConsumerCollection ? –

4

Может быть, вам нужен параллельный словарь BlockingCollection

 ConcurrentDictionary<int, BlockingCollection<string>> mailBoxes = new ConcurrentDictionary<int, BlockingCollection<string>>(); 
     int maxBoxes = 5; 

     CancellationTokenSource cancelationTokenSource = new CancellationTokenSource(); 
     CancellationToken cancelationToken = cancelationTokenSource.Token; 

     Random rnd = new Random(); 
     // Producer 
     Task.Factory.StartNew(() => 
     { 
      while (true) 
      { 
       int index = rnd.Next(0, maxBoxes); 
       // put the letter in the mailbox 'index' 
       var box = mailBoxes.GetOrAdd(index, new BlockingCollection<string>()); 
       box.Add("some message " + index, cancelationToken); 
       Console.WriteLine("Produced a letter to put in box " + index); 

       // Wait simulating a heavy production item. 
       Thread.Sleep(1000); 
      } 
     }); 

     // Consumer 1 
     Task.Factory.StartNew(() => 
     { 
      while (true) 
      { 
       int index = rnd.Next(0, maxBoxes); 
       // get the letter in the mailbox 'index' 
       var box = mailBoxes.GetOrAdd(index, new BlockingCollection<string>()); 
       var message = box.Take(cancelationToken); 
       Console.WriteLine("Consumed 1: " + message); 

       // consume a item cost less than produce it: 
       Thread.Sleep(50); 
      } 
     }); 

     // Consumer 2 
     Task.Factory.StartNew(() => 
     { 
      while (true) 
      { 
       int index = rnd.Next(0, maxBoxes); 
       // get the letter in the mailbox 'index' 
       var box = mailBoxes.GetOrAdd(index, new BlockingCollection<string>()); 
       var message = box.Take(cancelationToken); 
       Console.WriteLine("Consumed 2: " + message); 

       // consume a item cost less than produce it: 
       Thread.Sleep(50); 
      } 
     }); 

     Console.ReadLine(); 
     cancelationTokenSource.Cancel(); 

К таким образом, потребитель, который ожидает что-то в почтовом ящике 5 будет ждать, пока продукт не отправит письмо в почтовый ящик 5.

 Смежные вопросы

  • Нет связанных вопросов^_^