Объединить putIfAbsent и заменить ConcurrentMap

У меня есть usecase, где я должен

  • вставить новое значение, если ключ не существует в ConcurrentHashMap
  • замените старое значение новым значением, если ключ уже существует в ConcurrentHashMap, где новое значение получается из старого значения (не дорогостоящая операция).

У меня есть следующий код:

public void insertOrReplace(String key, String value) {
        boolean updated = false;
        do {
            String oldValue = concurrentMap.get(key);
            if (oldValue == null) {
                oldValue = concurrentMap.putIfAbsent(key, value);
                if (oldValue == null) {
                    updated = true;
                }
            }
            if (oldValue != null) {
                final String newValue = recalculateNewValue(oldValue, value);
                updated = concurrentMap.replace(key, oldValue, newValue);
            }
        } while (!updated);
    }

Считаете ли вы его правильным и потокобезопасным?

Есть ли более простой способ?

Ответы

Ответ 1

Вы можете сделать это немного короче с приведенным ниже кодом, который эквивалентен вашему. У меня есть стресс, который протестировал его немного с тысячами потоков, обращающихся к нему одновременно: он работает так, как ожидалось, с выполнением ряда попыток (циклов) (очевидно, вы никогда не сможете доказать правильность при тестировании в параллельном мире).

public void insertOrReplace(String key, String value) {
    for (;;) {
        String oldValue = concurrentMap.putIfAbsent(key, value);
        if (oldValue == null)
            return;

        final String newValue = recalculateNewValue(oldValue, value);
        if (concurrentMap.replace(key, oldValue, newValue))
            return;
    }
}

Ответ 2

Ваш метод кажется потокобезопасным. Если вам не нужны преимущества производительности ConcurrentHashMap, попробуйте вместо этого использовать обычный HashMap и синхронизировать весь доступ к нему. Ваш метод похож на AtomicInteger.getAndSet(int), поэтому все должно быть хорошо. Я сомневаюсь, что есть более простой способ сделать это, если вы не ищете библиотечный вызов для выполнения этой работы для вас.

Ответ 3

Я не думаю, что это правильно. Насколько я понимаю, метод merge() будет правильным инструментом для работы. В настоящее время у меня такая же проблема, и я написал тест litte, чтобы увидеть результаты.

В этом тесте запущено 100 человек. Каждый из них увеличивает значение на карте 100 раз. Таким образом, ожидаемый результат будет 10000.

Существует два типа рабочих. Тот, который использует алгоритм замены, и на этом использует слияние. Тест выполняется два раза с различными реализациями.

import java.util.concurrent.ArrayBlockingQueue;                                                                     
import java.util.concurrent.ConcurrentHashMap;                                                                      
import java.util.concurrent.ConcurrentMap;                                                                          
import java.util.concurrent.ExecutorService;                                                                        
import java.util.concurrent.ThreadPoolExecutor;                                                                     
import java.util.concurrent.TimeUnit;                                                                               

public class ConcurrentMapTest                                                                                      
{                                                                                                                   

   private static ConcurrentMap<String, Integer> map = new ConcurrentHashMap<>();                                   

   private final class ReplaceWorker implements Runnable                                                            
   {                                                                                                                
      public void run()                                                                                             
      {                                                                                                             
         for(int i = 0; i<100; i++)                                                                                 
         {                                                                                                          
            Integer putIfAbsent = map.putIfAbsent("key", Integer.valueOf(1));                                       
            if(putIfAbsent == null)                                                                                 
               return;                                                                                              
            map.replace("key", putIfAbsent + 1);                                                                    
         }                                                                                                          
      }                                                                                                             
   }                                                                                                                

   private final class MergeWorker implements Runnable                                                              
   {                                                                                                                
      public void run()                                                                                             
      {                                                                                                             
         for(int i = 0; i<100; i++)                                                                                 
         {                                                                                                          
            map.merge("key", Integer.valueOf(1), (ov, nv) -> {                                                      
               return ov + 1;                                                                                       
            });                                                                                                     
         }                                                                                                          
      }                                                                                                             
   }                                                                                                                

   public MergeWorker newMergeWorker()                                                                              
   {                                                                                                                
      return new MergeWorker();                                                                                     
   }                                                                                                                

   public ReplaceWorker newReplaceWorker()                                                                          
   {                                                                                                                
      return new ReplaceWorker();                                                                                   
   }                                                                                                                

   public static void main(String[] args)                                                                           
   {                                                                                                                
      map.put("key", 1);                                                                                            
      ConcurrentMapTest test = new ConcurrentMapTest();                                                             
      ThreadPoolExecutor threadPool = new ThreadPoolExecutor(10, 10, 100, TimeUnit.MILLISECONDS, new ArrayBlockingQu
      for(int i = 0; i<100; i++)                                                                                    
      {                                                                                                             
         threadPool.submit(test.newMergeWorker());                                                                  
      }                                                                                                             
      awaitTermination(threadPool);                                                                                 
      System.out.println(test.map.get("key"));                                                                      

      map.put("key", 1);                                                                                            
      threadPool = new ThreadPoolExecutor(10, 10, 100, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(1000));      
      for(int i = 0; i<100; i++)                                                                                    
      {                                                                                                             
         threadPool.submit(test.newReplaceWorker());                                                                
      }                                                                                                             
      awaitTermination(threadPool);                                                                                 
      System.out.println(test.map.get("key"));                                                                      
   }                                                                                                                

   private static void awaitTermination(ExecutorService threadPool)                                                 
   {                                                                                                                
      try                                                                                                           
      {                                                                                                             
         threadPool.shutdown();                                                                                     
         boolean awaitTermination = threadPool.awaitTermination(1, TimeUnit.SECONDS);                               
         System.out.println("terminted successfull: " + awaitTermination);                                          
      }                                                                                                             
      catch (InterruptedException e)                                                                                
      {                                                                                                             
         // TODO Auto-generated catch block                                                                         
         e.printStackTrace();                                                                                       
      }                                                                                                             
   }                                                                                                                
}                                                                                          
result:
terminted successfull: true
10000
terminted successfull: true
1743

Проблема в том, что в вашем случае существует разрыв между get и put, поэтому при одновременном доступе к результатам карты будут перезаписаны. Слияние - это атомная операция, хотя в документации ничего не говорится об этом.

Ответ 4

Вы можете использовать MutableMapIterable.updateValueWith(K key, Function0<? extends V> factory, Function2<? super V,? super P,? extends V> function, P parameter) из Коллекции Eclipse.

Аргумент factory создает начальное значение, если на карте нет ни одного. Аргумент function применяется к значению карты вместе с дополнительным параметром, чтобы создать новое значение карты. Этот parameter передается как окончательный аргумент updateValueWith(). Функция вызывается даже в том случае, если ключ не был на карте. Таким образом, начальное значение действительно function применяется к выходу factory и parameter. function не должен мутировать значение; он должен вернуть новое значение. В вашем примере значениями карты являются строки, которые являются неизменными, поэтому мы в порядке.

В ConcurrentMaps, например org.eclipse.collections.impl.map.mutable.ConcurrentHashMap, реализация updateValueWith() также является потокобезопасной и атомной. Важно, чтобы function не изменял значения карты, иначе он не был бы потокобезопасным. Вместо этого он должен возвращать новые значения. В вашем примере значениями карты являются строки, которые являются неизменными, поэтому мы в порядке.

Если ваш метод recalculateNewValue() просто выполняет конкатенацию строк, вот как вы можете использовать updateValueWith().

Function0<String> factory = () -> "initial ";
Function2<String, String, String> recalculateNewValue = String::concat;

MutableMap<String, String> map = new ConcurrentHashMap<>();
map.updateValueWith("test", factory, recalculateNewValue, "append1 ");
Assert.assertEquals("initial append1 ", map.get("test"));
map.updateValueWith("test", factory, recalculateNewValue, "append2");
Assert.assertEquals("initial append1 append2", map.get("test"));

Вы можете использовать Java 8 ConcurrentMap.compute(клавиша K, функция переназначения BiFunction), чтобы выполнить одно и то же, но имеет несколько недостатков.

ConcurrentMap<String, String> map = new ConcurrentHashMap<>();
map.compute("test", (key, oldValue) -> oldValue == null ? "initial append1 " : oldValue + "append1 ");
Assert.assertEquals("initial append1 ", map.get("test"));
map.compute("test", (key, oldValue) -> oldValue == null ? "initial append1 " : oldValue + "append2");
Assert.assertEquals("initial append1 append2", map.get("test"));
  • Нет отдельного factory для обработки случая отсутствующих ключей, поэтому тело лямбда должно иметь дело со значениями и начальными значениями.
  • API не подлежит повторному использованию lambdas. Каждый вызов updateValueWith() разделяет одни и те же лямбды, но каждый вызов compute() создает новый мусор в куче.

Примечание: я являюсь коммиттером для коллекций Eclipse