Обработчик блокировки для произвольных ключей

У меня есть код, который реализует "обработчик блокировки" для произвольных ключей. Учитывая key, он гарантирует, что только один поток за раз может process, который (или равен) (что здесь означает вызов externalSystem.process(key)).

До сих пор у меня такой код:

public class MyHandler {
    private final SomeWorkExecutor someWorkExecutor;
    private final ConcurrentHashMap<Key, Lock> lockMap = new ConcurrentHashMap<>();

    public void handle(Key key) {
        // This can lead to OOM as it creates locks without removing them
        Lock keyLock = lockMap.computeIfAbsent( 
            key, (k) -> new ReentrantLock()
        );
        keyLock.lock();
        try {
            someWorkExecutor.process(key);
        } finally {
            keyLock.unlock();
        }
    }
}

Я понимаю, что этот код может привести к OutOfMemoryError, потому что нет однозначной карты.

Я думаю о том, как создать карту, которая будет накапливать ограниченное количество элементов. Когда предел будет превышен, мы должны заменить старый элемент доступа новым (этот код должен быть синхронизирован с самым старым элементом в качестве монитора). Но я не знаю, как иметь обратный вызов, который скажет мне, что предел превышен.

Поделитесь своими мыслями.

P.S.

Я перечитываю задачу, и теперь вижу, что у меня есть ограничение, что метод handle не может быть вызван более чем 8 потоками. Я не знаю, как это может мне помочь, но я только что упомянул об этом.

P.S.2

от @Boris Spider было предложено приятное и простое решение:

} finally {
      lockMap.remove(key);
      keyLock.unlock();
}

Но после того, как Борис заметил, что код нам не потокобезопасен, потому что он нарушает поведение:
позволяет исследовать 3 потока, вызываемых с одинаковым ключом:

  • Thread # 1 получает блокировку и теперь до map.remove(key);
  • Thread # 2 вызывает ключ равенства, чтобы он дождался блокировки потока # 1.
  • тогда поток # 1 выполнить map.remove(key);. После этого потока # 3 вызывается метод handle. Он проверяет, что блокировка для этого ключа отсутствует на карте, таким образом, он создает новую блокировку и приобретает ее.
  • Тема № 1 освобождает замок, и, таким образом, поток # 2 приобретает его.
    Таким образом, поток # 2 и поток # 3 можно вызывать параллельно для равных ключей. Но это не должно допускаться.

Чтобы избежать этой ситуации, перед очисткой карты мы должны заблокировать любой поток, чтобы получить блокировку, в то время как все потоки из waitset не приобретают и не освобождают блокировку. Похоже, что достаточно сложной синхронизации, и это приведет к медленному функционированию алгоритма. Возможно, нам нужно время от времени очищать карту, когда размер карты превышает некоторое ограниченное значение.

Я потратил много времени, но, к сожалению, у меня нет идей, как этого добиться.

Ответы

Ответ 1

Вам не нужно пытаться ограничить размер каким-либо произвольным значением - как оказалось, вы можете выполнить этот тип именования "lock handler", только сохраняя точно количество ключей, заблокированных в настоящее время на карте.

Идея заключается в использовании простого соглашения: успешное добавление сопоставления к числу карт в качестве операции блокировки и удаление его считается "разблокировкой". Это аккуратно избегает проблемы удаления отображения, в то время как какой-то поток по-прежнему заблокирован и другие условия гонки.

В этот момент value в сопоставлении используется только для блокировки других потоков, которые поступают с одним и тем же ключом, и должны ждать, пока не будет удалено сопоставление.

Здесь пример 1 с CountDownLatch, а не Lock в качестве значения карты:

public void handle(Key key) throws InterruptedException {
    CountDownLatch latch = new CountDownLatch(1);

    // try to acquire the lock by inserting our latch as a
    // mapping for key        
    while(true) {
        CountDownLatch existing = lockMap.putIfAbsent(key, latch);
        if (existing != null) {
            // there is an existing key, wait on it
            existing.await();
        } else {
            break;
        }
    }

    try {
        externalSystem.process(key);
    } finally {
        lockMap.remove(key);
        latch.countDown();
    }
}

Здесь время жизни отображения сохраняется до тех пор, пока удерживается блокировка. На карте никогда не будет больше записей, чем есть параллельные запросы для разных ключей.

Разница с вашим подходом заключается в том, что сопоставления не "повторно используются" - каждый вызов handle создает новую защелку и сопоставление. Поскольку вы уже выполняете дорогостоящие атомные операции, это вряд ли будет значительным замедление на практике. Другим недостатком является то, что со многими ожидающими потоками все разбужаются, когда защелка подсчитывается, но только одному удастся помещать новое отображение и, следовательно, приобретать блокировку - остальные снова возвращаются к сне на новом замке.

Вы можете создать другую версию этого, которая повторно использует сопоставления, когда потоки придут и ожидают существующего сопоставления. В принципе, разблокирующий поток просто выполняет "передачу обслуживания" одному из ожидающих потоков. Только одно сопоставление будет использоваться для всего набора потоков, которые ждут того же ключа - он передается каждому из них последовательно. Размер по-прежнему ограничен, потому что на заданном сопоставлении не осталось ни одного потока.

Чтобы реализовать это, вы заменяете CountDownLatch на значение карты, которое может подсчитывать количество ожидающих потоков. Когда поток выполняет разблокировку, он сначала проверяет, ждут ли какие-либо потоки, и, если это так, просыпается, чтобы выполнить передачу обслуживания. Если нити не ждут, он "уничтожает" объект (т.е. Устанавливает флаг, что объект больше не находится в отображении) и удаляет его с карты.

Вам нужно выполнить описанные выше манипуляции под правильным замком, и есть несколько сложных деталей. На практике я нахожу, что короткий и сладкий пример выше отлично работает.


1 Написано "на лету", не скомпилировано и не проверено, но идея работает.

Ответ 2

Вы можете положиться на метод compute(K key, BiFunction<? super K,? super V,? extends V> remappingFunction) для синхронизации вызовов вашего метода process для заданного ключа, вы не даже больше нужно использовать Lock как тип значений вашей карты, поскольку вы больше не полагаетесь на нее.

Идея состоит в том, чтобы полагаться на механизм внутреннего блокирования вашего ConcurrentHashMap для выполнения вашего метода, это позволит потокам параллельно выполнять метод process для ключей, чьи хэши не являются частью одного и того же бина. Это эквивалентно подходу, основанному на полосатых блокировках, за исключением того, что вам не нужна дополнительная сторонняя библиотека.

Подход полосатых замков интересен тем, что он очень светлый с точки зрения памяти, поскольку для этого требуется только ограниченное количество замков, поэтому объем памяти, необходимый для ваших замков, известен и никогда не изменяется, что не случай подходов, которые используют один замок для каждого ключа (например, в вашем вопросе), так что обычно рекомендуется/рекомендуется использовать подходы на основе полосатых замков для такой необходимости.

Итак, ваш код может быть примерно таким:

// This will create a ConcurrentHashMap with an initial table size of 16   
// bins by default, you may provide an initialCapacity and loadFactor
// if too much or not enough to get the expected table size in order
// increase or reduce the concurrency level of your map
// NB: We don't care much of the type of the value so I arbitrarily
// used Void but it could be any type like simply Object
private final ConcurrentMap<Key, Void> lockMap = new ConcurrentHashMap<>();

public void handle(Key lockKey) {
    // Execute the method process through the remapping Function
    lockMap.compute(
        lockKey,
        (key, value) -> {
            // Execute the process method under the protection of the
            // lock of the bin of hashes corresponding to the key
            someWorkExecutor.process(key);
            // Returns null to keep the Map empty
            return null;
        }
    );
}

NB 1: Поскольку мы всегда возвращаем null, карта всегда будет пустой, так что из-за этой карты вы никогда не закончите память.

NB 2: Поскольку мы никогда не влияем на значение для данного ключа, обратите внимание, что это также можно сделать с помощью метода computeIfAbsent(K key, Function<? super K,? extends V> mappingFunction):

public void handle(Key lockKey) {
    // Execute the method process through the remapping Function
    lockMap.computeIfAbsent(
        lockKey,
        key -> {
            // Execute the process method under the protection of the
            // lock of the segment of hashes corresponding to the key
            someWorkExecutor.process(key);
            // Returns null to keep the Map empty
            return null;
        }
    );
}

NB 3: Убедитесь, что ваш метод process никогда не вызывает метод handle для любых клавиш, так как вы закончите бесконечные циклы (тот же ключ) или взаимоблокировки (другие не упорядоченные ключи, например: Если один поток вызывает handle(key1), а затем process внутренне вызывает handle(key2), а другой поток вызывает параллель handle(key2), а затем process внутренне вызывает handle(key1), вы получите тупик независимо от используемого подхода). Это поведение не является специфическим для этого подхода, оно будет происходить с любыми подходами.

Ответ 3

Один из подходов состоит в том, чтобы полностью отказаться от совместной карты хэша и просто использовать регулярный HashMap с блокировкой для выполнения требуемой манипуляции с картой и состоянием блокировки атомарно.

На первый взгляд это уменьшает concurrency системы, но если мы предположим, что вызов process(key) длинный относительно очень быстрых манипуляций с блокировками, он хорошо работает, поскольку вызовы process() все еще выполняются одновременно, В исключительном критическом разделе происходит лишь небольшое и фиксированное количество работ.

Здесь эскиз:

public class MyHandler {

    private static class LockHolder {
        ReentrantLock lock = new ReentrantLock();
        int refcount = 0;
        void lock(){
            lock.lock();
        }
    } 

    private final SomeWorkExecutor someWorkExecutor;
    private final Lock mapLock = new ReentrantLock();
    private final HashMap<Key, LockHolder> lockMap = new HashMap<>();

    public void handle(Key key) {

        // lock the map
        mapLock.lock();
        LockHolder holder = lockMap.computeIfAbsent(key, k -> new LockHolder());
        // the lock in holder is either unlocked (newly created by us), or an existing lock, let increment refcount
        holder.refcount++;
        mapLock.unlock();

        holder.lock();

        try {
            someWorkExecutor.process(key);
        } finally {
            mapLock.lock()
            keyLock.unlock();
            if (--holder.refcount == 0) {
              // no more users, remove lock holder
              map.remove(key);
            }
            mapLock.unlock();
        }
    }
}

Мы используем refcount, который управляется только под общим mapLock, чтобы отслеживать, сколько пользователей блокировки есть. Всякий раз, когда refcount равен нулю, мы можем избавиться от записи при выходе из обработчика. Этот подход хорош в том, что его довольно легко рассуждать и будет хорошо работать, если вызов process() относительно дорог по сравнению с накладными расходами на блокировку. Поскольку манипуляция с картами происходит под общей блокировкой, также легко добавить дополнительную логику, например, сохранить некоторые объекты Holder на карте, отслеживать статистику и т.д.

Ответ 4

Спасибо Ben Mane
Я нашел этот вариант.

public class MyHandler {
    private final int THREAD_COUNT = 8;
    private final int K = 100;
    private final Striped<Lock> striped = Striped.lazyWeakLock(THREAD_COUNT * K);
    private final SomeWorkExecutor someWorkExecutor = new SomeWorkExecutor();

    public void handle(Key key) throws InterruptedException {
        Lock keyLock = striped.get(key);

        keyLock.lock();
        try {
            someWorkExecutor.process(key);
        } finally {
            keyLock.unlock();
        }       
    }
}

Ответ 5

Вот короткая и сладкая версия, которая использует weak версию Guava Interner, чтобы сделать серьезный подъем с помощью "канонического" объекта для каждого ключа, который будет использоваться в качестве блокировки, и реализации слабой ссылочной семантики, чтобы очищенные неиспользуемые записи были очищены.

public class InternerHandler {
    private final Interner = Interners.newWeakInterner();

    public void handle(Key key) throws InterruptedException {
        Key canonKey = Interner.intern(key);
        synchronized (canonKey) {
            someWorkExecutor.process(key);
        }       
    }
}

В основном мы запрашиваем канонический canonKey, который находится от equal() до key, а затем блокируем этот canonKey. Все согласятся на канонический ключ, и, следовательно, все вызывающие, которые передают равные ключи, согласятся на объект, на который нужно заблокировать.

Слабая природа Interner означает, что в любое время, когда канонический ключ не используется, запись может быть удалена, поэтому вы избегаете накопления записей в интернере. Позже, если снова появляется равный ключ, выбирается новая каноническая запись.

Простой код выше полагается на встроенный монитор на synchronize - но если это не работает для вас (например, оно уже используется для другой цели), вы можете включить объект блокировки в классе key или создать объект-держатель.

Ответ 6

class MyHandler {
    private final Map<Key, Lock> lockMap = Collections.synchronizedMap(new WeakHashMap<>());
    private final SomeWorkExecutor someWorkExecutor = new SomeWorkExecutor();

    public void handle(Key key) throws InterruptedException {
        Lock keyLock = lockMap.computeIfAbsent(key, (k) -> new ReentrantLock()); 
        keyLock.lock();
        try {
            someWorkExecutor.process(key);
        } finally {
            keyLock.unlock();
        }
    }
}

Ответ 7

Создание и удаление объекта блокировки для key каждый раз является дорогостоящей операцией с точки зрения производительности. Когда вы добавляете/удаляете блокировку с параллельной карты (например, кеш), она должна быть гарантией того, что удаление/удаление объекта из кеша само по себе является потокобезопасным. Таким образом, это выглядит не очень хорошо, но может быть реализовано с помощью ConcurrentHashMap

Метод блокировки полосы (также используемый внутренней картой хэша внутри) - лучший подход. Из Документы Google Guava объясняется как

Если вы хотите связать блокировку с объектом, ключ гарантирует вам нужно, чтобы if key1.equals(key2), затем блокировка, связанная с key1 совпадает с блокировкой, связанной с ключом2.

Самый грубый способ сделать это - связать каждую клавишу с тем же блокировка, что приводит к самой грубой синхронизации. На с другой стороны, вы можете связать каждый отдельный ключ с другим блокировка, но для этого требуется потребление линейной памяти и concurrencyуправление для самой системы блокировок, поскольку открываются новые ключи.

Striped позволяет программисту выбирать несколько блокировок, которые распределенных между ключами на основе их хеш-кода. Это позволяет программист для динамического выбора компромисса между concurrency и потребления памяти, сохраняя при этом ключ-инвариант, который, если key1.equals(key2), затем striped.get(key1) == striped.get(key2)

код:

//declare globally; e.g. class field level
Striped<Lock> rwLockStripes = Striped.lock(16);

    Lock lock = rwLockStripes.get("key");
    lock.lock();
    try {
        // do you work here
    } finally {
        lock.unlock();
    }

После обрезки кода может помочь в реализации установки/удаления блокировки.

private ConcurrentHashMap<String, ReentrantLock> caches = new ConcurrentHashMap<>();

public void processWithLock(String key) {
    ReentrantLock lock = findAndGetLock(key);
    lock.lock();
    try {
        // do you work here

    } finally {
        unlockAndClear(key, lock);
    }
}

private void unlockAndClear(String key, ReentrantLock lock) {
    // *** Step 1: Release the lock.
    lock.unlock();
    // *** Step 2: Attempt to remove the lock
    // This is done by calling compute method, if given lock is present in
    // cache. if current lock object in cache is same instance as 'lock'
    // then remove it from cache. If not, some other thread is succeeded in
    // putting new lock object and hence we can leave the removal of lock object to that
    // thread.
    caches.computeIfPresent(key, (k, current) -> lock == current ? null : current);

}

private ReentrantLock findAndGetLock(String key) {
    // Merge method given us the access to the previously( if available) and
    // newer lock object together.
    return caches.merge(key, new ReentrantLock(), (older, newer) -> nonNull(older) ? older : newer);
}

Ответ 8

Вместо того, чтобы писать самостоятельно, вы можете попробовать что-то вроде JKeyLockManager. Из описания проектов:

JKeyLockManager обеспечивает мелкозернистую блокировку с помощью приложения специальные ключи.

Пример кода, приведенный на сайте:

public class WeatherServiceProxy {
  private final KeyLockManager lockManager = KeyLockManagers.newManager();

  public void updateWeatherData(String cityName, float temperature) {
    lockManager.executeLocked(cityName, () -> delegate.updateWeatherData(cityName, temperature)); 
  }

Ответ 9

Новые значения будут добавлены при вызове

lockMap.computeIfAbsent()

Итак, вы можете просто проверить lockMap.size() для подсчета элементов.

Но как вы собираетесь найти первый добавленный предмет? было бы лучше просто удалить предметы после того, как вы их использовали.

Ответ 10

Вы можете использовать кеш процесса, который хранит ссылки на объекты, такие как Caffeine, Guava, EHCache или cache2k. Вот пример построения кеша с cache2k:

final Cache<Key, Lock> locks =
  new Cache2kBuilder<Key, Lock>(){}
    .loader(
      new CacheLoader<Key, Lock>() {
        @Override
        public Lock load(Key o) {
          return new ReentrantLock();
        }
      }
    )
    .storeByReference(true)
    .entryCapacity(1000)
    .build();

Шаблон использования такой же, как у вас в вопросе:

    Lock keyLock = locks.get(key);
    keyLock.lock();
    try {
        externalSystem.process(key);
    } finally {
        keyLock.unlock();
    }

Поскольку кеш ограничен 1000 записями, происходит автоматическая очистка блокировок, которые больше не используются.

Существует вероятность того, что блокировка, используемая кэшем, будет выведена, если емкость и количество потоков в приложении несовместимы. Это решение прекрасно работает годами в наших приложениях. Кэш вытеснит блокировку, которая используется, когда есть достаточно длительная работа и превышена пропускная способность. В реальном приложении вы всегда контролируете количество жизненных потоков, например. в веб-контейнере вы должны ограничить количество потоков обработки (пример) 100. Знаете, что в использовании не более 100 блокировок. Если это учитывается, это решение имеет минимальные накладные расходы.

Имейте в виду, что блокировка работает только до тех пор, пока ваше приложение работает на одной виртуальной машине. Вы можете взглянуть на распределенных менеджеров блокировок (DLM). Примеры продуктов, которые обеспечивают распределенные блокировки: орешник, бесконечный, теракотта, redis/redisson.