Объединить putIfAbsent и заменить ConcurrentMap
У меня есть usecase, где я должен
- вставить новое значение, если ключ не существует в ConcurrentHashMap
- замените старое значение новым значением, если ключ уже существует в
ConcurrentHashMap, где новое значение получается из старого значения (не дорогостоящая операция).
У меня есть следующий код:
public void insertOrReplace(String key, String value) {
boolean updated = false;
do {
String oldValue = concurrentMap.get(key);
if (oldValue == null) {
oldValue = concurrentMap.putIfAbsent(key, value);
if (oldValue == null) {
updated = true;
}
}
if (oldValue != null) {
final String newValue = recalculateNewValue(oldValue, value);
updated = concurrentMap.replace(key, oldValue, newValue);
}
} while (!updated);
}
Считаете ли вы его правильным и потокобезопасным?
Есть ли более простой способ?
Ответы
Ответ 1
Вы можете сделать это немного короче с приведенным ниже кодом, который эквивалентен вашему. У меня есть стресс, который протестировал его немного с тысячами потоков, обращающихся к нему одновременно: он работает так, как ожидалось, с выполнением ряда попыток (циклов) (очевидно, вы никогда не сможете доказать правильность при тестировании в параллельном мире).
public void insertOrReplace(String key, String value) {
for (;;) {
String oldValue = concurrentMap.putIfAbsent(key, value);
if (oldValue == null)
return;
final String newValue = recalculateNewValue(oldValue, value);
if (concurrentMap.replace(key, oldValue, newValue))
return;
}
}
Ответ 2
Ваш метод кажется потокобезопасным. Если вам не нужны преимущества производительности ConcurrentHashMap, попробуйте вместо этого использовать обычный HashMap и синхронизировать весь доступ к нему. Ваш метод похож на AtomicInteger.getAndSet(int), поэтому все должно быть хорошо. Я сомневаюсь, что есть более простой способ сделать это, если вы не ищете библиотечный вызов для выполнения этой работы для вас.
Ответ 3
Я не думаю, что это правильно. Насколько я понимаю, метод merge() будет правильным инструментом для работы. В настоящее время у меня такая же проблема, и я написал тест litte, чтобы увидеть результаты.
В этом тесте запущено 100 человек. Каждый из них увеличивает значение на карте 100 раз. Таким образом, ожидаемый результат будет 10000.
Существует два типа рабочих. Тот, который использует алгоритм замены, и на этом использует слияние. Тест выполняется два раза с различными реализациями.
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class ConcurrentMapTest
{
private static ConcurrentMap<String, Integer> map = new ConcurrentHashMap<>();
private final class ReplaceWorker implements Runnable
{
public void run()
{
for(int i = 0; i<100; i++)
{
Integer putIfAbsent = map.putIfAbsent("key", Integer.valueOf(1));
if(putIfAbsent == null)
return;
map.replace("key", putIfAbsent + 1);
}
}
}
private final class MergeWorker implements Runnable
{
public void run()
{
for(int i = 0; i<100; i++)
{
map.merge("key", Integer.valueOf(1), (ov, nv) -> {
return ov + 1;
});
}
}
}
public MergeWorker newMergeWorker()
{
return new MergeWorker();
}
public ReplaceWorker newReplaceWorker()
{
return new ReplaceWorker();
}
public static void main(String[] args)
{
map.put("key", 1);
ConcurrentMapTest test = new ConcurrentMapTest();
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(10, 10, 100, TimeUnit.MILLISECONDS, new ArrayBlockingQu
for(int i = 0; i<100; i++)
{
threadPool.submit(test.newMergeWorker());
}
awaitTermination(threadPool);
System.out.println(test.map.get("key"));
map.put("key", 1);
threadPool = new ThreadPoolExecutor(10, 10, 100, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(1000));
for(int i = 0; i<100; i++)
{
threadPool.submit(test.newReplaceWorker());
}
awaitTermination(threadPool);
System.out.println(test.map.get("key"));
}
private static void awaitTermination(ExecutorService threadPool)
{
try
{
threadPool.shutdown();
boolean awaitTermination = threadPool.awaitTermination(1, TimeUnit.SECONDS);
System.out.println("terminted successfull: " + awaitTermination);
}
catch (InterruptedException e)
{
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
result:
terminted successfull: true
10000
terminted successfull: true
1743
Проблема в том, что в вашем случае существует разрыв между get и put, поэтому при одновременном доступе к результатам карты будут перезаписаны. Слияние - это атомная операция, хотя в документации ничего не говорится об этом.
Ответ 4
Вы можете использовать MutableMapIterable.updateValueWith(K key, Function0<? extends V> factory, Function2<? super V,? super P,? extends V> function, P parameter)
из Коллекции Eclipse.
Аргумент factory
создает начальное значение, если на карте нет ни одного. Аргумент function
применяется к значению карты вместе с дополнительным параметром, чтобы создать новое значение карты. Этот parameter
передается как окончательный аргумент updateValueWith()
. Функция вызывается даже в том случае, если ключ не был на карте. Таким образом, начальное значение действительно function
применяется к выходу factory
и parameter
. function
не должен мутировать значение; он должен вернуть новое значение. В вашем примере значениями карты являются строки, которые являются неизменными, поэтому мы в порядке.
В ConcurrentMaps, например org.eclipse.collections.impl.map.mutable.ConcurrentHashMap
, реализация updateValueWith()
также является потокобезопасной и атомной. Важно, чтобы function
не изменял значения карты, иначе он не был бы потокобезопасным. Вместо этого он должен возвращать новые значения. В вашем примере значениями карты являются строки, которые являются неизменными, поэтому мы в порядке.
Если ваш метод recalculateNewValue()
просто выполняет конкатенацию строк, вот как вы можете использовать updateValueWith()
.
Function0<String> factory = () -> "initial ";
Function2<String, String, String> recalculateNewValue = String::concat;
MutableMap<String, String> map = new ConcurrentHashMap<>();
map.updateValueWith("test", factory, recalculateNewValue, "append1 ");
Assert.assertEquals("initial append1 ", map.get("test"));
map.updateValueWith("test", factory, recalculateNewValue, "append2");
Assert.assertEquals("initial append1 append2", map.get("test"));
Вы можете использовать Java 8 ConcurrentMap.compute(клавиша K, функция переназначения BiFunction), чтобы выполнить одно и то же, но имеет несколько недостатков.
ConcurrentMap<String, String> map = new ConcurrentHashMap<>();
map.compute("test", (key, oldValue) -> oldValue == null ? "initial append1 " : oldValue + "append1 ");
Assert.assertEquals("initial append1 ", map.get("test"));
map.compute("test", (key, oldValue) -> oldValue == null ? "initial append1 " : oldValue + "append2");
Assert.assertEquals("initial append1 append2", map.get("test"));
- Нет отдельного factory для обработки случая отсутствующих ключей, поэтому тело лямбда должно иметь дело со значениями и начальными значениями.
- API не подлежит повторному использованию lambdas. Каждый вызов
updateValueWith()
разделяет одни и те же лямбды, но каждый вызов compute()
создает новый мусор в куче.
Примечание: я являюсь коммиттером для коллекций Eclipse