Существует класс Counter
, который содержит набор ключей и позволяет увеличивать значение каждой клавиши и получать все значения. Таким образом, задача, которую я пытаюсь решить, такая же, как и в Atomically incrementing counters stored in ConcurrentHashMap. Разница в том, что набор ключей неограничен, поэтому новые ключи добавляются часто.Приращение и удаление элементов ConcurrentHashMap
Чтобы уменьшить потребление памяти, я очищаю значения после их считывания, это происходит в Counter.getAndClear()
. Ключи также удалены, и это, похоже, нарушает все.
Один поток увеличивает случайные ключи, а другой поток получает моментальные снимки всех значений и очищает их.
код ниже:
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ThreadLocalRandom;
import java.util.Map;
import java.util.HashMap;
import java.lang.Thread;
class HashMapTest {
private final static int hashMapInitSize = 170;
private final static int maxKeys = 100;
private final static int nIterations = 10_000_000;
private final static int sleepMs = 100;
private static class Counter {
private ConcurrentMap<String, Long> map;
public Counter() {
map = new ConcurrentHashMap<String, Long>(hashMapInitSize);
}
public void increment(String key) {
Long value;
do {
value = map.computeIfAbsent(key, k -> 0L);
} while (!map.replace(key, value, value + 1L));
}
public Map<String, Long> getAndClear() {
Map<String, Long> mapCopy = new HashMap<String, Long>();
for (String key : map.keySet()) {
Long removedValue = map.remove(key);
if (removedValue != null)
mapCopy.put(key, removedValue);
}
return mapCopy;
}
}
// The code below is used for testing
public static void main(String[] args) throws InterruptedException {
Counter counter = new Counter();
Thread thread = new Thread(new Runnable() {
public void run() {
for (int j = 0; j < nIterations; j++) {
int index = ThreadLocalRandom.current().nextInt(maxKeys);
counter.increment(Integer.toString(index));
}
}
}, "incrementThread");
Thread readerThread = new Thread(new Runnable() {
public void run() {
long sum = 0;
boolean isDone = false;
while (!isDone) {
try {
Thread.sleep(sleepMs);
}
catch (InterruptedException e) {
isDone = true;
}
Map<String, Long> map = counter.getAndClear();
for (Map.Entry<String, Long> entry : map.entrySet()) {
Long value = entry.getValue();
sum += value;
}
System.out.println("mapSize: " + map.size());
}
System.out.println("sum: " + sum);
System.out.println("expected: " + nIterations);
}
}, "readerThread");
thread.start();
readerThread.start();
thread.join();
readerThread.interrupt();
readerThread.join();
// Ensure that counter is empty
System.out.println("elements left in map: " + counter.getAndClear().size());
}
}
время тестирования я заметил, что некоторые приращения теряются. Я получаю следующие результаты:
sum: 9993354
expected: 10000000
elements left in map: 0
Если вы не можете воспроизвести эту ошибку (что сумма меньше, чем ожидалось), вы можете попытаться увеличить maxKeys несколько порядков или уменьшить или увеличить hashMapInitSize nIterations (последний также увеличивает время выполнения). Я также включил тестовый код (основной метод) в случае, если он имеет какие-либо ошибки.
Я подозреваю, что ошибка возникает, когда емкость ConcurrentHashMap увеличивается во время выполнения. На моем компьютере код работает корректно, когда hashMapInitSize
равен 170, но сбой при hashMapInitSize
равен 171. Я считаю, что размер 171 триггеров увеличивает емкость (128/0.75 == 170.66, где 0.75 - коэффициент нагрузки по умолчанию для хэш-карты) ,
Итак, вопрос в том, что я использую remove
, replace
и computeIfAbsent
операции правильно? Я предполагаю, что они являются атомными операциями на ConcurrentHashMap
на основе ответов на Use of ConcurrentHashMap eliminates data-visibility troubles?. Если да, то почему некоторые приращения потеряны?
EDIT:
Я думаю, что я пропустил важную деталь здесь increment()
предполагается назвать гораздо чаще, чем getAndClear()
, так что я стараюсь избегать какой-либо явной блокировки в increment()
. Тем не менее, я собираюсь проверить производительность различных версий позже, чтобы увидеть, действительно ли это проблема.
Есть ли причина, чтобы не использовать [ 'AtomicLong's] (https://docs.oracle.com/javase/8 /docs/api/java/util/concurrent/atomic/AtomicLong.html)? –
@ ChaiT.Rex На самом деле я начал с использования AtomicLong, но оказалось, что это намного сложнее, потому что и ConcurrentHashMap, и AtomicLong имеют атомарные операции, но они не являются атомарными при объединении (рассмотрим 'map.computeIfAbsent (key, k -> новый AtomicLong (0L)). incrementAndGet() '). Это можно было бы решить с явной синхронизацией, но это значительно сокращает пропускную способность. – user502144
Ваш метод 'increment' должен действительно быть просто' map.merge (key, 1L, Long :: sum) '. Я бы действительно сомневался в том, что вы можете создать потокобезопасный 'getAndClear()' независимо от того, что вы сделали. –