2015-11-03 6 views
12

Есть писатель, который обновляет цены, вызывая метод putPrice. Читатель использует getPrice, чтобы получить последнюю цену. hasChangedMethod возвращает логическую идентификацию, если цена была изменена с последнего вызова getPrice.Java-совместимые замки на уровне ключа карты

Я ищу быстрое решение. Я пытаюсь достичь потокобезопасного последовательного чтения/записи на карте на ключевом уровне.

Я думаю, что блокировка всей карты может вызвать проблему с производительностью, поэтому я решил сделать ее на ключевом уровне. К сожалению, он не работает должным образом и блокирует всю карту. Зачем? Не могли бы вы помочь мне разобраться, что я здесь делаю неправильно?

UPDATE:

Я думаю, мы можем суммировать два вопроса: 1. Как обеспечить свободный доступ к остальной части ключей, если один находится в процессе обновления. 2. Как я могу гарантировать атомные операции своих методов, так как они требуют множественных операций чтения/записи. например getPrice() - цена и обновление hasChanged.

PriceHolder.java

public final class PriceHolder { 

    private ConcurrentMap<String, Price> prices; 

    public PriceHolder() { 
     this.prices = new ConcurrentHashMap<>(); 

     //Receive starting prices.. 
     Price EUR = new Price(); 
     EUR.setHasChangedSinceLastRead(true); 
     EUR.setPrice(new BigDecimal(0)); 

     Price USD = new Price(); 
     USD.setHasChangedSinceLastRead(true); 
     USD.setPrice(new BigDecimal(0)); 
     this.prices.put("EUR", EUR); 
     this.prices.put("USD", USD); 

    } 

    /** Called when a price ‘p’ is received for an entity ‘e’ */ 
    public void putPrice(
      String e, 
      BigDecimal p) throws InterruptedException { 

      synchronized (prices.get(e)) { 
       Price currentPrice = prices.get(e); 
       if (currentPrice != null && !currentPrice.getPrice().equals(p)) { 
        currentPrice.setHasChangedSinceLastRead(true); 
        currentPrice.setPrice(p); 
       } else { 
        Price newPrice = new Price(); 
        newPrice.setHasChangedSinceLastRead(true); 
        newPrice.setPrice(p); 
        prices.put(e, newPrice); 
       } 
      } 
    } 

    /** Called to get the latest price for entity ‘e’ */ 
    public BigDecimal getPrice(String e) { 
     Price currentPrice = prices.get(e); 
     if(currentPrice != null){ 
      synchronized (prices.get(e)){ 
       currentPrice.setHasChangedSinceLastRead(false); 
       prices.put(e, currentPrice); 
      } 
      return currentPrice.getPrice(); 
     } 
     return null; 
    } 

    /** 
    * Called to determine if the price for entity ‘e’ has 
    * changed since the last call to getPrice(e). 
    */ 
    public boolean hasPriceChanged(String e) { 
     synchronized (prices.get(e)){ 
      return prices.get(e) != null ? prices.get(e).isHasChangedSinceLastRead() : false; 
     } 
    } 
} 

Price.java

public class Price { 

    private BigDecimal price; 

    public boolean isHasChangedSinceLastRead() { 
     return hasChangedSinceLastRead; 
    } 

    public void setHasChangedSinceLastRead(boolean hasChangedSinceLastRead) { 
     this.hasChangedSinceLastRead = hasChangedSinceLastRead; 
    } 

    public BigDecimal getPrice() { 
     return price; 
    } 

    public void setPrice(BigDecimal price) { 
     this.price = price; 
    } 

    private boolean hasChangedSinceLastRead = false; 

} 
+0

Вы знаете заранее, что ключи (я имею в виду названия валюты, такие как евро и доллары США) будут находиться в режиме разработки или писатель может также поставить некоторые новые валюты во время выбега время? – IzCe

+0

@IzCe, да, я знаю все ключи заранее. В основном количество валют в мире. Может быть, я немного смущен, тогда как бы я заблокировал его на ключевом уровне? –

+1

Поскольку вы знаете ключи заранее, вам не нужно использовать ConcurrentMap. Достаточно просто Карта. Во время инициализации вашей программы основной поток может заполнить карту этими клавишами и объектами Price со значениями по умолчанию. Все, что вам нужно - это получить объект Price и синхронизировать его для каждой операции get и put. Вам не нужно использовать 'Thread.sleep (3000);' вызов в синхронизированном блоке, так как нет смысла ждать потока в таком критическом пути. – IzCe

ответ

1

Как о чем-то вроде

class AtomicPriceHolder { 

    private volatile BigDecimal value; 
    private volatile boolean dirtyFlag; 

    public AtomicPriceHolder(BigDecimal initialValue) { 
    this.value = initialValue; 
    this.dirtyFlag = true; 
    } 

    public synchronized void updatePrice(BigDecimal newPrice) { 
    if (this.value.equals(newPrice) == false) { 
     this.value = newPrice; 
     this.dirtyFlag = true; 
    } 
    } 

    public boolean isDirty() { 
    return this.dirtyFlag; 
    } 

    public BigDecimal peek() { 
    return this.value; 
    } 

    public synchronized BigDecimal read() { 
    this.dirtyFlag = false; 
    return this.value; 
    } 

} 

...

public void updatePrice(String id, BigDecimal value) { 

    AtomicPriceHolder holder; 
    synchronized(someGlobalSyncObject) { 
    holder = prices.get(id); 
    if (holder == null) { 
     prices.put(id, new AtomicPriceHolder(value)); 
     return; 
    } 
    } 

    holder.updatePrice(value); 

} 

Обратите внимание, что это, вероятно, не имеет никакого смысла в этом смысле, потому что фактическая атомная модификация стоимости цены настолько велика, что вы не можете ожидать ничего выиграть от разблокировки карты раньше.

Условные операции «проверьте, находится ли он на карте, создайте новый и вставьте, если нет», должны быть атомарными и должны выполняться путем блокировки всей карты за этот короткий период. Для чего-то еще потребуется выделенный объект синхронизации для каждого ключа. Их нужно будет хранить и управлять где-то, и доступ к этому магазину должен быть снова синхронизирован. & c.

Просто сделайте крупнозернистую блокировку, чтобы убедиться, что у вас есть правильность, а затем двигайтесь дальше.

0

Я бы рекомендовал использовать наблюдаемый шаблон наблюдателя. Не нужно заново изобретать колесо. См. Observer and Observable

Я бы также рекомендовал изучить Condition Поскольку нет необходимости блокировать весь объект для всех считывателей. Чтение может быть одновременным, но писать нельзя.

Если коллекция является параллельной, это не означает, что она волшебным образом синхронизирует все. Это просто гарантирует, что их методы являются потокобезопасными. Как только вы покинете область действия, блокировка будет отпущена. Поскольку вам нужен более продвинутый способ управления синхронизацией, лучше всего взять это в свои руки и использовать обычную HashMap.

Некоторые примечания:

Вы перерасхода на HashMap.get. Подумайте о том, чтобы получить его один раз и сохранить его в переменной.

synchronized (prices.get(e)) 

Это может вернуть значение null, и вы должны проверить его. синхронизация по нулевым объектам не допускается.

prices.put(e, currentPrice); 

Я не уверен, что это предназначено, но это действие не требуется. См this

+0

«Это может вернуть значение null, и вы должны проверить его». - Может оказаться сложнее, чем кажется. Вы не можете атомизировать проверку, если объект имеет нулевую и * синхронизацию на нем. – JimmyB

1
  1. как я обеспечить свободный доступ к остальной части ключей, если один находится в процессе обновления.

Просто используя ConcurrentHashMap достаточно, чтобы обеспечить свободный доступ к ключам; get не вносят никаких разногласий, и put s блокирует только подмножество ключей, а не всю карту.

  1. Как я могу гарантировать атомные операции своих методов, так как они требуют многократных операций чтения/записи.

Для обеспечения согласованности необходимо синхронизировать на некоторых общих объектов (или использовать другой механизм блокировки, как ReentrantLock); Я хотел бы предложить создание ConcurrentHashMap<String, Object> объектов блокировки, так что вы можете сделать:

synchronized (locks.get(e)) { ... } 

Просто заполнить карту с new Object()-х гг. Риск с использованием шаблона, который вы используете (блокировки объектов Price), теперь эти объекты должны сохраняться и никогда не заменяться. Легче обеспечить, чтобы это обеспечивало выделенную частную коллекцию блокировок, а не перегрузку вашего типа значений в качестве механизма блокировки.


Как и в сторону, если вы пытаетесь делать денежные операции в Java, вы должны абсолютно быть с помощью Joda-Money библиотеки, а не изобретать велосипед.

1

Использование ConcurrentMap в значительной степени зависит от версии Java. При использовании Java 8 или более поздней версии, вы получите почти все бесплатно:

public final class PriceHolder { 

    private ConcurrentMap<String, Price> prices; 

    public PriceHolder() { 
     this.prices = new ConcurrentHashMap<>(); 

     //Receive starting prices.. 
     Price EUR = new Price(); 
     EUR.setHasChangedSinceLastRead(true); 
     EUR.setPrice(BigDecimal.ZERO); 

     Price USD = new Price(); 
     USD.setHasChangedSinceLastRead(true); 
     USD.setPrice(BigDecimal.ZERO); 
     this.prices.put("EUR", EUR); 
     this.prices.put("USD", USD); 

    } 

    /** Called when a price ‘p’ is received for an entity ‘e’ */ 
    public void putPrice(String e, BigDecimal p) { 
     prices.compute(e, (k, price)-> { 
      if(price==null) price=new Price(); 
      price.setHasChangedSinceLastRead(true); 
      price.setPrice(p); 
      return price; 
     }); 
    } 

    /** Called to get the latest price for entity ‘e’ */ 
    public BigDecimal getPrice(String e) { 
     Price price = prices.computeIfPresent(e, (key, value) -> { 
      value.setHasChangedSinceLastRead(false); 
      return value; 
     }); 
     return price==null? null: price.getPrice(); 
    } 

    /** 
    * Called to determine if the price for entity ‘e’ has 
    * changed since the last call to getPrice(e). 
    */ 
    public boolean hasPriceChanged(String e) { 
     final Price price = prices.get(e); 
     return price!=null && price.isHasChangedSinceLastRead(); 
    } 
} 

В compute… методах на параллельную карте зафиксировать пораженную запись на время вычислений, позволяя обновления всех других записей продолжаются. Для простого доступа к get, как в hasPriceChanged, дополнительная синхронизация не требуется, если вы вызываете ее только один раз в методе, т. Е. Сохраняете результат в локальной переменной при исследовании.


Перед Java 8 все сложнее. Там все ConcurrentMap предлагает определенные методы атомного обновления, которые можно использовать для создания более высокоуровневых методов обновления методом try-and-repeat.

Чтобы использовать его чисто, лучший способ это сделать класс значение неизменяемого:

public final class Price { 

    private final BigDecimal price; 
    private final boolean hasChangedSinceLastRead; 

    Price(BigDecimal value, boolean changed) { 
     price=value; 
     hasChangedSinceLastRead=changed; 
    } 
    public boolean isHasChangedSinceLastRead() { 
     return hasChangedSinceLastRead; 
    } 
    public BigDecimal getPrice() { 
     return price; 
    } 
} 

затем использовать его, чтобы всегда построить новый объект, отражающий желаемое новое состояние и выполнить атомарные обновления с использованием либо putIfAbsent или replace :

public final class PriceHolder { 

    private ConcurrentMap<String, Price> prices; 

    public PriceHolder() { 
     this.prices = new ConcurrentHashMap<>(); 

     //Receive starting prices.. 
     Price EUR = new Price(BigDecimal.ZERO, true); 
     Price USD = EUR; // we can re-use immutable objects... 
     this.prices.put("EUR", EUR); 
     this.prices.put("USD", USD); 
    } 

    /** Called when a price ‘p’ is received for an entity ‘e’ */ 
    public void putPrice(String e, BigDecimal p) { 
     Price old, _new=new Price(p, true); 
     do old=prices.get(e); 
     while(old==null? prices.putIfAbsent(e,_new)!=null: !prices.replace(e,old,_new)); 
    } 

    /** Called to get the latest price for entity ‘e’ */ 
    public BigDecimal getPrice(String e) { 
     for(;;) { 
      Price price = prices.get(e); 
      if(price==null) return null; 
      if(!price.isHasChangedSinceLastRead() 
      || prices.replace(e, price, new Price(price.getPrice(), false))) 
      return price.getPrice(); 
     } 
    } 

    /** 
    * Called to determine if the price for entity ‘e’ has 
    * changed since the last call to getPrice(e). 
    */ 
    public boolean hasPriceChanged(String e) { 
     final Price price = prices.get(e); 
     return price!=null && price.isHasChangedSinceLastRead(); 
    } 
} 
+0

Имейте в виду, что вычисление берет на себя выполнение записи (60M против 1B ops/s). Вероятно, вы захотите использовать схему блокировки с двойной проверкой, чтобы оптимистично иметь производительность чтения в общем случае. –

+1

@Ben Manes: вычислить * есть * напишите здесь. Даже в том случае, если он возвращает существующий объект, он изменит два свойства. Поэтому без смысловой записи потребуется дополнительная блокировка. Оптимистичное чтение было бы полезно, если бы вы предположили, что иногда случается, что конкретной операции не требуется выполнять обновление, но это не задается вопросом. – Holger

+0

Извините, я имел в виду вычислить в общем смысле. В частности, я имел в виду использование 'getPrice'' computeIfPresent'. Поскольку проблема проблемы была проблемой производительности, это показалось целесообразным. –

1

hasChangedMethod возвращает логическое значение, идентифицирующее если цена изменилась с момента последнего времени getPrice было INVO рунец.

Это проблематичный шаблон, поскольку hasPriceChanged по существу нуждается в возвращении чего-то другого в потоке. Если вы можете подробнее рассказать о том, что вы на самом деле пытаетесь сделать (например, почему вы думаете, что вам нужен этот шаблон), возможно, будет предложена альтернатива. Например, рассмотрите возможность полностью отказаться от hasPriceChanged и просто обрабатывать эту структуру данных как каноническую и каждый раз запрашивать ее текущее значение.


Это означает, что я могу реализовать поведение, которое вы ищете. Там могут быть альтернативы, это всего лишь первый проход.

Хранить ConcurrentHashMap<String, ThreadLocal<Boolean>>; ThreadLocal сохранит статус входящих вызовов в потоке. Я также использую отдельную частную карту замков.

ConcurrentHashMap<String, Price> pricesMap; 
ConcurrentHashMap<String, ThreadLocal<Boolean>> seenMap; 
ConcurrentHashMap<String, Object> lockMap; 

private Object getLock(String key) { 
    return lockMap.computeIfAbsent(key, k -> new Object()); 
} 

private ThreadLocal<Boolean> getSeen(String key) { 
    return seenMap.computeIfAbsent(e, 
     ThreadLocal.<Boolean>withInitial(() -> false)); 
} 

public void putPrice(String e, BigDecimal p) { 
    synchronized (getLock(e)) { 
    // price has changed, clear the old state to mark all threads unseen 
    seenMap.remove(e); 
    pricesMap.get(e).setPrice(p); 
    } 
} 

public BigDecimal getPrice(String e) { 
    synchronized (getLock(e)) { 
    // marks the price seen for this thread 
    getSeen(e).set(true); 
    BigDecimal price = pricesMap.get(e); 
    return price != null ? price.getPrice() : null; 
    } 
} 

public boolean hasPriceChanged(String e) { 
    synchronized (getLock(e)) { 
    return !getSeen(e).get(); 
    } 
} 

Обратите внимание, что в то время как структура данных поточно-прежнему существует риск состояния гонки здесь - вы могли бы назвать hasPriceChanged() и получить обратно false, сразу после чего цена изменяется в другом потоке. Выполнение этого поведения hasPriceChanged(), скорее всего, упростит ваш код.

+0

Отлично, позвольте мне попробовать ! Кстати, есть некоторые проблемы с типом, возвращаемым лямбдой в методе 'getSeen()'. –

+0

Простите, я просто закодировал это в ряд, так что могут быть небольшие опечатки. Вы всегда можете написать Java-стиль в случае необходимости. Не стесняйтесь редактировать этот пост с изменениями. – dimo414

0

Лучший способ получить то, что вам нужно, - это, вероятно, перейти к использованию карты ConcurrentMap для карты и поместить всю другую синхронизацию в свой ценовой класс. Это приведет к более простому коду, который всегда очень ценен в многопоточной среде, чтобы избежать тонких ошибок, а также для достижения ваших целей одновременного доступа к карте и отслеживания того, была ли запись с момента последнего чтения для каждой валюты.

В ценовом классе, когда вы устанавливаете цену, вы также хотите установить hasChangedSinceLastRead; эти две вещи объединяются как одна операция, которая должна быть атомарной. Всякий раз, когда вы читаете цену, вы также хотите очистить hasChangedSinceLastRead; это вы также хотите быть атомарным. Таким образом, класс должен разрешать этим двум операциям изменять hasChangedSinceLastRead, а не оставлять логику для других классов, а методы должны быть синхронизированы, чтобы гарантировать, что price и hasChangedSinceLastRead не могут выйти из синхронизации из-за доступа несколькими потоками. Класс должен выглядеть следующим образом:

public class Price { 

    public boolean isHasChangedSinceLastRead() { 
     return hasChangedSinceLastRead; 
    } 

    // setHasChangedSinceLastRead() removed 

    public synchronized BigDecimal getPrice() { 
     hasChangedSinceLastRead = false; 
     return price; 
    } 

    public synchronized void setPrice(BigDecimal newPrice) { 
     if (null != price && price.equals(newPrice) { 
      return; 
     } 
     price = newPrice; 
     hasChangedSinceLastRead = true; 
    } 

    private BigDecimal price; 
    private volatile boolean hasChangedSinceLastRead = false; 
} 

Обратите внимание, что вы можете сделать isHasChangedSinceLastRead() синхронизируется, или hasChangedSinceLastRead летучий; Я выбрал последнее, оставив isHasChangedSinceLastRead() несинхронизированным, потому что для синхронизации метода требуется полный барьер памяти, в то время как для переменной volatile требуется только считываемая половина барьера памяти при чтении переменной.

Причина, по которой чтение hasChangedSinceLastRead требует своего рода барьера памяти, состоит в том, что синхронизированные методы, блоки и нестабильный доступ гарантируют только эффективный порядок выполнения - отношения «происходит до» - с другими синхронизированными методами, блоками и изменчивым доступом. Если isHasChangedSinceLastRead не синхронизирован, и переменная нестабильна, отношения «происходит до» не существует; в этом случае isHasChangedSinceLastRead может возвращать «true», но поток, вызывающий его, может не увидеть изменения цены. Это связано с тем, что установка price и hasChangedSinceLastRead в setPrice() может быть замечена в обратном порядке другими потоками, если связь «произойдет до» не установлена.

Теперь, когда вся необходимая синхронизация находится в классах ConcurrentMap и Price, вам больше не нужно выполнять какую-либо синхронизацию в PriceHolder, а PriceHolder больше не нужно беспокоиться об обновлении hasChangedSinceLastRead. Код упрощаются:

public final class PriceHolder { 

    private ConcurrentMap<String, Price> prices; 

    public PriceHolder() { 
     prices = new ConcurrentHashMap<>(); 

     //Receive starting prices.. 
     Price EUR = new Price(); 
     EUR.setPrice(new BigDecimal(0)); 
     this.prices.put("EUR", EUR); 

     Price USD = new Price(); 
     USD.setPrice(new BigDecimal(0)); 
     this.prices.put("USD", USD); 
    } 

    /** Called when a price ‘p’ is received for an entity ‘e’ */ 
    public void putPrice(
     String e, 
     BigDecimal p 
    ) throws InterruptedException { 
     Price currentPrice = prices.get(e); 
     if (null == currentPrice) { 
      currentPrice = new Price(); 
      currentPrice = prices.putIfAbsent(e); 
     } 
     currentPrice.setPrice(p); 
    } 

    /** Called to get the latest price for entity ‘e’ */ 
    public BigDecimal getPrice(String e) { 
     Price currentPrice = prices.get(e); 
     if (currentPrice != null){ 
      return currentPrice.getPrice(); 
     } 
     return null; 
    } 

    /** 
    * Called to determine if the price for entity ‘e’ has 
    * changed since the last call to getPrice(e). 
    */ 
    public boolean hasPriceChanged(String e) { 
     Price currentPrice = prices.get(e); 
     return null != currentPrice ? currentPrice.isHasChangedSinceLastRead() : false; 
    } 
} 
+0

Ничего себе! Очень просто и чисто. Не могли бы вы объяснить, почему нам нужно 'volatile' на' hasChangedSinceLastRead'? Я думал, что если мы поместим 'synchronized' в метод, он заблокирует весь объект. –

+0

Добавлен абзац, объясняющий, почему либо isHasChangedSinceLastRead() должен быть синхронизирован, либо hasChangedSinceLastRead должен быть изменчивым. Либо один достаточно, но нам нужен один из двух. Я выбрал последнее, потому что у него меньше накладных расходов. Подробнее в отредактированном ответе. –