2016-05-10 2 views
8

Я хочу периодически перебрать ConcurrentHashMap при удалении записей, например:перебрать ConcurrentHashMap во время удаления записей

for (Iterator<Entry<Integer, Integer>> iter = map.entrySet().iterator(); iter.hasNext();) { 
    Entry<Integer, Integer> entry = iter.next(); 
    // do something 
    iter.remove(); 
} 

Проблема заключается в том, что другой поток может обновлять или изменять значения, пока я итерация. Если это произойдет, эти обновления могут быть потеряны навсегда, потому что мой поток видит только устаревшие значения во время итерации, но remove() удалит живую запись.

После некоторых раздумий, я придумал этот обходной путь:

map.forEach((key, value) -> { 
    // delete if value is up to date, otherwise leave for next round 
    if (map.remove(key, value)) { 
     // do something 
    } 
}); 

Одна проблема состоит в том, что она не будет ловить изменения в изменяемых значений, которые не реализуют equals() (например, AtomicInteger). Есть ли лучший способ безопасного удаления с одновременными изменениями?

+0

Почему бы не удалить запись перед выполнением каких-либо работ. –

+0

@ClaudioCorsi, который не изменит тот факт, что я вижу устаревшую версию удаленной записи. – shmosel

+0

Проблема в том, что вам нужно знать, что было обновлено с тех пор, как вы начали выполнять итерацию по карте. Даже если вы можете узнать, какие объекты были обновлены. По-прежнему возможно, что другой поток имеет ссылку на объект, который был обработан, но этот объект не был обновлен. Будет ли этот объект добавлен обратно или он просто обновляется? Должен ли этот объект генерировать другой обратный вызов? –

ответ

1

Ваше обходное решение работает, но есть один потенциальный сценарий. Если некоторые записи имеют постоянные обновления, map.remove (key, value) никогда не вернет true до тех пор, пока не закончится обновление.

Если вы используете JDK8 здесь мое решение

for (Iterator<Entry<Integer, Integer>> iter = map.entrySet().iterator(); iter.hasNext();) { 
    Entry<Integer, Integer> entry = iter.next(); 
    Map.compute(entry.getKey(), (k, v) -> f(v)); 
    //do something for prevValue 
} 
.... 
private Integer prevValue; 

private Integer f(Integer v){ 
    prevValue = v; 
    return null; 
} 

вычисления() будет применяться п (v) значение и в нашем случае присвоить значение глобальной переменной и удалить запись.

Согласно Джавадоку, он является атомарным.

Попытка вычислить отображение для указанного ключа и его текущего отображаемого значения (или null, если нет текущего сопоставления). Весь вызов метода выполняется атомарно. Некоторые попытки выполнить операции обновления на этой карте другими потоками могут быть заблокированы, пока выполняется вычисление, поэтому вычисление должно быть коротким и простым и не должно пытаться обновлять любые другие сопоставления этой Карты.

+0

'compute()' будет возвращать значение null, если 'f()' возвращает значение null. – shmosel

+0

Кроме того, было бы более простым перебирать ключи с помощью вашего подхода. – shmosel

1

Ваше обходное решение на самом деле довольно хорошее. Существуют и другие объекты, поверх которых вы можете построить несколько схожее решение (например, используя computeIfPresent() и значения надгробных памятников), но у них есть свои собственные предостережения, и я использовал их в несколько разных вариантах использования.

Что касается типа, который не реализует значения equals() для значений карты, вы можете использовать свою собственную оболочку поверх соответствующего типа. Это самый простой способ добавить пользовательскую семантику для равенства объектов в операции атомарного замещения/удаления, предоставленные ConcurrentMap.

Update

Вот эскиз, который показывает, как можно построить на вершине ConcurrentMap.remove(Object key, Object value) API:

  • Определить тип обертки поверх изменяемом типа используемого для значений, а также определяя ваш собственный метод equals(), построенный поверх текущего изменяемого значения.
  • В вашем BiConsumer (лямбда вы переходите на forEach), создайте глубокую копию значения (типа вашего нового типа обертки) и выполните свою логику, определяющую необходимость удаления значения в копии.
  • Если значение необходимо удалить, позвоните по номеру remove(myKey, myValueCopy).
    • Если произошли некоторые изменения, в то время как одновременно вы вычисляли, нужно ли значение, которое будет удалено, remove(myKey, myValueCopy) вернется false (за исключением проблемы ABA, которые являются отдельной темой).

Вот код, иллюстрирующий это: "Заменено"

import java.util.Random; 
import java.util.concurrent.ConcurrentHashMap; 
import java.util.concurrent.ConcurrentMap; 
import java.util.concurrent.atomic.AtomicInteger; 

public class Playground { 

    private static class AtomicIntegerWrapper { 
     private final AtomicInteger value; 

     AtomicIntegerWrapper(int value) { 
     this.value = new AtomicInteger(value); 
     } 

     public void set(int value) { 
     this.value.set(value); 
     } 

     public int get() { 
     return this.value.get(); 
     } 

     @Override 
     public boolean equals(Object obj) { 
     if (this == obj) { 
      return true; 
     } 
     if (!(obj instanceof AtomicIntegerWrapper)) { 
      return false; 
     } 
     AtomicIntegerWrapper other = (AtomicIntegerWrapper) obj; 
     if (other.value.get() == this.value.get()) { 
      return true; 
     } 
     return false; 
     } 

     public static AtomicIntegerWrapper deepCopy(AtomicIntegerWrapper wrapper) { 
     int wrapped = wrapper.get(); 
     return new AtomicIntegerWrapper(wrapped); 
     } 
    } 

    private static final ConcurrentMap<Integer, AtomicIntegerWrapper> MAP 
     = new ConcurrentHashMap<>(); 

    private static final int NUM_THREADS = 3; 

    public static void main(String[] args) throws InterruptedException { 
     for (int i = 0; i < 10; ++i) { 
     MAP.put(i, new AtomicIntegerWrapper(1)); 
     } 

     Thread.sleep(1); 

     for (int i = 0; i < NUM_THREADS; ++i) { 
     new Thread(() -> { 
      Random rnd = new Random(); 
      while (!MAP.isEmpty()) { 
       MAP.forEach((key, value) -> { 
        AtomicIntegerWrapper elem = MAP.get(key); 
        if (elem == null) { 
        System.out.println("Oops..."); 
        } else if (elem.get() == 1986) { 
        elem.set(1); 
        } else if ((rnd.nextInt() & 128) == 0) { 
        elem.set(1986); 
        } 
       }); 
      } 
     }).start(); 
     } 

     Thread.sleep(1); 

     new Thread(() -> { 
     Random rnd = new Random(); 
     while (!MAP.isEmpty()) { 
      MAP.forEach((key, value) -> { 
       AtomicIntegerWrapper elem = 
        AtomicIntegerWrapper.deepCopy(MAP.get(key)); 
       if (elem.get() == 1986) { 
        try { 
        Thread.sleep(10); 
        } catch (Exception e) {} 
        boolean replaced = MAP.remove(key, elem); 
        if (!replaced) { 
        System.out.println("Bailed out!"); 
        } else { 
        System.out.println("Replaced!"); 
        } 
       } 
      }); 
     } 
     }).start(); 
    } 
} 

Вы увидите распечаток "! Выручили", перемешаны с (удаление было успешным, так как не было никаких параллельных обновлений, о которых вы заботитесь), и расчет остановится в какой-то момент.

  • Если вы удалите метод пользовательских equals() и продолжают использовать копию, вы увидите бесконечный поток «выручили!», Потому что копия никогда не считается равным значению в карте.
  • Если вы не используете копию, вы не увидите «Bailed out!». напечатаны, и вы столкнетесь с проблемой, которую вы объясняете, - значения удаляются независимо от одновременных изменений.
+0

На самом деле, я думаю, что моя точка 'equals()' была немного неактуальна. Проблема будет возникать до тех пор, пока значение мутируется, а не заменяется, потому что 'remove()' видит ту же ссылку. Я предполагаю, что оболочка будет работать, если мы захотим создать новую для каждого обновления. – shmosel

+0

Если я ошибаюсь, я думаю, что этот подход отлично подходит для изменения мутаций. Я обновляю свой ответ, чтобы добавить образец кода, иллюстрирующий подход. –

+0

Ваше решение работает, но моя идея немного проще для моих требований. Мне не нужно проверять текущее значение перед удалением; Я хочу удалить * любое * значение, пока я вижу последнюю версию. Таким образом, все, что мне нужно сделать, это создать мелкую копию оболочки при обновлении, чтобы избежать ссылочного равенства, и полагаться на гарантии атомарности 'remove()' и операции обновления (через 'compute()', 'merge()' и т. Д.), чтобы гарантировать невозможность изменения значений после успешного удаления. – shmosel

0

Давайте рассмотрим, какие у вас варианты.

  1. Создайте свой собственный контейнер класса с isUpdated() операции и использовать свой собственный обходной путь.

  2. Если ваша карта содержит всего несколько элементов, и вы часто повторяете карту по сравнению с операцией put/delete. Это может быть хорошим выбором для использования CopyOnWriteArrayList CopyOnWriteArrayList<Entry<Integer, Integer>> lookupArray = ...;

  3. Другой вариант заключается в реализации собственного CopyOnWriteMap

    public class CopyOnWriteMap<K, V> implements Map<K, V>{ 
    
        private volatile Map<K, V> currentMap; 
    
        public V put(K key, V value) { 
         synchronized (this) { 
          Map<K, V> newOne = new HashMap<K, V>(this.currentMap); 
          V val = newOne.put(key, value); 
          this.currentMap = newOne; // atomic operation 
          return val; 
         } 
        } 
    
        public V remove(Object key) { 
         synchronized (this) { 
          Map<K, V> newOne = new HashMap<K, V>(this.currentMap); 
          V val = newOne.remove(key); 
          this.currentMap = newOne; // atomic operation 
          return val; 
         } 
        } 
    
        [...] 
    } 
    

Существует отрицательная побочный эффект. Если вы используете коллекцию «Copy-on-Write», ваши обновления никогда не будут потеряны, но вы можете снова увидеть предыдущую удаленную запись.

Худший случай: удаленная запись будет восстановлена ​​каждый раз, если карта будет скопирована.