2016-12-19 6 views
0

Существует класс Counter, который содержит набор ключей и позволяет увеличивать значение каждой клавиши и получать все значения. Таким образом, задача, которую я пытаюсь решить, такая же, как и в Atomically incrementing counters stored in ConcurrentHashMap. Разница в том, что набор ключей неограничен, поэтому новые ключи добавляются часто.Приращение и удаление элементов ConcurrentHashMap

Чтобы уменьшить потребление памяти, я очищаю значения после их считывания, это происходит в Counter.getAndClear(). Ключи также удалены, и это, похоже, нарушает все.

Один поток увеличивает случайные ключи, а другой поток получает моментальные снимки всех значений и очищает их.

код ниже:

import java.util.concurrent.ConcurrentHashMap; 
import java.util.concurrent.ConcurrentMap; 
import java.util.concurrent.ThreadLocalRandom; 
import java.util.Map; 
import java.util.HashMap; 
import java.lang.Thread; 

class HashMapTest { 
    private final static int hashMapInitSize = 170; 
    private final static int maxKeys = 100; 
    private final static int nIterations = 10_000_000; 
    private final static int sleepMs = 100; 

    private static class Counter { 
     private ConcurrentMap<String, Long> map; 

     public Counter() { 
      map = new ConcurrentHashMap<String, Long>(hashMapInitSize); 
     } 

     public void increment(String key) { 
      Long value; 
      do { 
       value = map.computeIfAbsent(key, k -> 0L); 
      } while (!map.replace(key, value, value + 1L)); 
     } 

     public Map<String, Long> getAndClear() { 
      Map<String, Long> mapCopy = new HashMap<String, Long>(); 
      for (String key : map.keySet()) { 
       Long removedValue = map.remove(key); 
       if (removedValue != null) 
        mapCopy.put(key, removedValue); 
      } 
      return mapCopy; 
     } 
    } 

    // The code below is used for testing 
    public static void main(String[] args) throws InterruptedException { 
     Counter counter = new Counter(); 
     Thread thread = new Thread(new Runnable() { 
      public void run() { 
       for (int j = 0; j < nIterations; j++) { 
        int index = ThreadLocalRandom.current().nextInt(maxKeys); 
        counter.increment(Integer.toString(index)); 
       } 
      } 
     }, "incrementThread"); 
     Thread readerThread = new Thread(new Runnable() { 
      public void run() { 
       long sum = 0; 
       boolean isDone = false; 
       while (!isDone) { 
        try { 
         Thread.sleep(sleepMs); 
        } 
        catch (InterruptedException e) { 
         isDone = true; 
        } 
        Map<String, Long> map = counter.getAndClear(); 
        for (Map.Entry<String, Long> entry : map.entrySet()) { 
         Long value = entry.getValue(); 
         sum += value; 
        } 
        System.out.println("mapSize: " + map.size()); 
       } 
       System.out.println("sum: " + sum); 
       System.out.println("expected: " + nIterations); 
      } 
     }, "readerThread"); 
     thread.start(); 
     readerThread.start(); 
     thread.join(); 
     readerThread.interrupt(); 
     readerThread.join(); 
     // Ensure that counter is empty 
     System.out.println("elements left in map: " + counter.getAndClear().size()); 
    } 
} 

время тестирования я заметил, что некоторые приращения теряются. Я получаю следующие результаты:

sum: 9993354 
expected: 10000000 
elements left in map: 0 

Если вы не можете воспроизвести эту ошибку (что сумма меньше, чем ожидалось), вы можете попытаться увеличить maxKeys несколько порядков или уменьшить или увеличить hashMapInitSize nIterations (последний также увеличивает время выполнения). Я также включил тестовый код (основной метод) в случае, если он имеет какие-либо ошибки.

Я подозреваю, что ошибка возникает, когда емкость ConcurrentHashMap увеличивается во время выполнения. На моем компьютере код работает корректно, когда hashMapInitSize равен 170, но сбой при hashMapInitSize равен 171. Я считаю, что размер 171 триггеров увеличивает емкость (128/0.75 == 170.66, где 0.75 - коэффициент нагрузки по умолчанию для хэш-карты) ,

Итак, вопрос в том, что я использую remove, replace и computeIfAbsent операции правильно? Я предполагаю, что они являются атомными операциями на ConcurrentHashMap на основе ответов на Use of ConcurrentHashMap eliminates data-visibility troubles?. Если да, то почему некоторые приращения потеряны?

EDIT:

Я думаю, что я пропустил важную деталь здесь increment() предполагается назвать гораздо чаще, чем getAndClear(), так что я стараюсь избегать какой-либо явной блокировки в increment(). Тем не менее, я собираюсь проверить производительность различных версий позже, чтобы увидеть, действительно ли это проблема.

+0

Есть ли причина, чтобы не использовать [ 'AtomicLong's] (https://docs.oracle.com/javase/8 /docs/api/java/util/concurrent/atomic/AtomicLong.html)? –

+0

@ ChaiT.Rex На самом деле я начал с использования AtomicLong, но оказалось, что это намного сложнее, потому что и ConcurrentHashMap, и AtomicLong имеют атомарные операции, но они не являются атомарными при объединении (рассмотрим 'map.computeIfAbsent (key, k -> новый AtomicLong (0L)). incrementAndGet() '). Это можно было бы решить с явной синхронизацией, но это значительно сокращает пропускную способность. – user502144

+0

Ваш метод 'increment' должен действительно быть просто' map.merge (key, 1L, Long :: sum) '. Я бы действительно сомневался в том, что вы можете создать потокобезопасный 'getAndClear()' независимо от того, что вы сделали. –

ответ

1

Я считаю, что проблема заключается в использовании remove, итерации по keySet. Это то, что говорит JavaDoc для Map#keySet() (мой акцент):

Возвращает заданный вид ключей, содержащихся на этой карте. Набор поддерживается картой, поэтому изменения в карте отражаются в наборе и наоборот. Если карта изменена, когда выполняется итерация по множеству (за исключением операции собственного удаления итератора), результаты итерации не определены.

JavaDoc для ConcurrentHashMap дать дополнительные ключи:

Аналогично, итераторы, Spliterators и Перечисления возвращают элементы, отражающие состояние хеш-таблицы в некоторой точке на или с момента создания итератора/перечисления.

Вывод состоит в том, что мутация карты в то время как итерация по клавишам не предикативна.

Одним из решений является создание новой карты для операции getAndClear() и просто возврат старой карты. Выключатель должен быть защищен, и в приведенном ниже примере я использовал ReentrantReadWriteLock:

class HashMapTest { 
private final static int hashMapInitSize = 170; 
private final static int maxKeys = 100; 
private final static int nIterations = 10_000_000; 
private final static int sleepMs = 100; 

private static class Counter { 
    private ConcurrentMap<String, Long> map; 
    ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); 
    ReadLock readLock = lock.readLock(); 
    WriteLock writeLock = lock.writeLock(); 

    public Counter() { 
     map = new ConcurrentHashMap<>(hashMapInitSize); 
    } 

    public void increment(String key) { 
     readLock.lock(); 
     try { 
      map.merge(key, 1L, Long::sum); 
     } finally { 
      readLock.unlock(); 
     } 
    } 

    public Map<String, Long> getAndClear() { 
     ConcurrentMap<String, Long> oldMap; 
     writeLock.lock(); 
     try { 
      oldMap = map; 
      map = new ConcurrentHashMap<>(hashMapInitSize); 
     } finally { 
      writeLock.unlock(); 
     } 

     return oldMap; 
    } 
} 

// The code below is used for testing 
public static void main(String[] args) throws InterruptedException { 
    final AtomicBoolean ready = new AtomicBoolean(false); 

    Counter counter = new Counter(); 
    Thread thread = new Thread(new Runnable() { 
     public void run() { 
      for (int j = 0; j < nIterations; j++) { 
       int index = ThreadLocalRandom.current().nextInt(maxKeys); 
       counter.increment(Integer.toString(index)); 
      } 
     } 
    }, "incrementThread"); 

    Thread readerThread = new Thread(new Runnable() { 
     public void run() { 
      long sum = 0; 
      while (!ready.get()) { 
       try { 
        Thread.sleep(sleepMs); 
       } catch (InterruptedException e) { 
        // 
       } 
       Map<String, Long> map = counter.getAndClear(); 
       for (Map.Entry<String, Long> entry : map.entrySet()) { 
        Long value = entry.getValue(); 
        sum += value; 
       } 
       System.out.println("mapSize: " + map.size()); 
      } 
      System.out.println("sum: " + sum); 
      System.out.println("expected: " + nIterations); 
     } 
    }, "readerThread"); 
    thread.start(); 
    readerThread.start(); 
    thread.join(); 
    ready.set(true); 
    readerThread.join(); 
    // Ensure that counter is empty 
    System.out.println("elements left in map: " + counter.getAndClear().size()); 
} 
} 
+0

Спасибо, что указали на проблему! Я читал об удалении элементов из ConcurrentHashMap во время его итерации, а два метода перепутались в моей голове: 'Iterator.remove' и' Map.remove', оставив меня с неправильным впечатлением, что безопасно называть 'Map.remove' при повторении , Однако безопасно называть 'Iterator.remove'. – user502144

+0

Я попытался скопировать набор ключей перед удалением элементов, а краткое тестирование показывает, что оно может работать. – user502144