Проектирование блокировки на основе ключа (или блокировки карты)
Я пытаюсь создать механизм блокировки на основе ключей: что-то вроде обычной блокировки реентера, но вместо блокировки() и unlock() вы блокируете (ключ) и разблокируете (ключ), с контрактом, что нет -one сможет блокировать (key1) одновременно, если key.equals(key1).
Будет ли этот код работать? Существуют ли более эффективные решения? Мне особенно не нравится цикл while, пытаясь поместить замок на карту...
package luca;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.locks.ReentrantLock;
public class KeyedReentrantLock<K> {
private ConcurrentMap<K, ReentrantLock> lockMap = new ConcurrentHashMap<K, ReentrantLock>();
public void lock(K key) {
ReentrantLock oldLock = lockMap.get(key);
if (oldLock != null && oldLock.isHeldByCurrentThread()){
// increase lock count and return.
oldLock.lock();
return;
}
ReentrantLock newLock = new ReentrantLock();
newLock.lock();
while ((oldLock = lockMap.putIfAbsent(key, newLock)) != null){
// wait for the old lock to be released;
oldLock.lock();
oldLock.unlock();
}
return;
}
public void unlock(K key){
ReentrantLock lock = lockMap.get(key);
if (lock == null) throw new IllegalMonitorStateException("There was no lock for this key!");
if (lock.getHoldCount() == 1){
lockMap.remove(key);
}
lock.unlock();
}
}
Ответы
Ответ 1
Почему бы просто не использовать простое полосатое блокирование, например:
/**
* Striped locks holder, contains array of {@link java.util.concurrent.locks.ReentrantLock}, on which lock/unlock
* operations are performed. Purpose of this is to decrease lock contention.
* <p>When client requests lock, it gives an integer argument, from which target lock is derived as follows:
* index of lock in array equals to <code>id & (locks.length - 1)</code>.
* Since <code>locks.length</code> is the power of 2, <code>locks.length - 1</code> is string of '1' bits,
* and this means that all lower bits of argument are taken into account.
* <p>Number of locks it can hold is bounded: it can be from set {2, 4, 8, 16, 32, 64}.
*/
public class StripedLock {
private final ReentrantLock[] locks;
/**
* Default ctor, creates 16 locks
*/
public StripedLock() {
this(4);
}
/**
* Creates array of locks, size of array may be any from set {2, 4, 8, 16, 32, 64}
* @param storagePower size of array will be equal to <code>Math.pow(2, storagePower)</code>
*/
public StripedLock(int storagePower) {
if (!(storagePower >= 1 && storagePower <= 6)) { throw new IllegalArgumentException("storage power must be in [1..6]"); }
int lockSize = (int) Math.pow(2, storagePower);
locks = new ReentrantLock[lockSize];
for (int i = 0; i < locks.length; i++)
locks[i] = new ReentrantLock();
}
/**
* Locks lock associated with given id.
* @param id value, from which lock is derived
*/
public void lock(int id) {
getLock(id).lock();
}
/**
* Unlocks lock associated with given id.
* @param id value, from which lock is derived
*/
public void unlock(int id) {
getLock(id).unlock();
}
/**
* Map function between integer and lock from locks array
* @param id argument
* @return lock which is result of function
*/
private ReentrantLock getLock(int id) {
return locks[id & (locks.length - 1)];
}
}
Ответ 2
Пожалуйста, обратитесь к следующему образцу кода, я создал новую блокировку для каждого потока.
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
public class Processor implements Runnable {
final static ConcurrentHashMap<Integer, ReentrantReadWriteLock> CONCURRENT_HASH_MAP = new ConcurrentHashMap<Integer, ReentrantReadWriteLock>();
final private Employee employee;
public Processor(int id) {
this.employee = new Employee(id);
}
public void run() {
processDate(employee);
}
/**
* Method to be shared
*
* @param id
*/
public void processDate(final Employee employee) {
final int employeeId = employee.getId();
ReentrantReadWriteLock monitoredObject = new ReentrantReadWriteLock();
System.out.println("Before taking the lock"
+ Thread.currentThread().getName());
while (CONCURRENT_HASH_MAP.putIfAbsent(employeeId, monitoredObject) != null) {
}
ReadLock lock = monitoredObject.readLock();
lock.lock();
try {
processXML(employee);
} catch (Exception e) {
e.printStackTrace();
}
CONCURRENT_HASH_MAP.remove(employeeId);
lock.unlock();
}
/**
* For similar id object this will run one by one but for different objects
* this will run parallal.
*
* This method will execute serially if called by multiple threads for
* employee with same emp id
*
* @throws Exception
*/
public void processXML(final Employee employee) throws Exception {
System.out.println("Process XML for " + employee.getId()
+ Thread.currentThread().getName());
Thread.sleep(2000);
System.out.println("Done XML Processing for " + employee.getId()
+ Thread.currentThread().getName());
ReentrantReadWriteLock lock = CONCURRENT_HASH_MAP.get(employee.getId());
System.out.println("lock object " + lock + "queue length "
+ lock.getQueueLength());
}
class Employee {
private Integer id;
public Employee(final int id) {
this.id = id;
}
public void setId(Integer id) {
this.id = id;
}
public Integer getId() {
return id;
}
}
public static void main(String[] args) {
final ExecutorService executorService = Executors.newFixedThreadPool(10);
long startTime = System.currentTimeMillis();
/**
* In Processors Constructor instead of i write 1 and see the
* difference.
*/
for (int i = 1; i <= 5; i++)
executorService.submit(new Processor(i));
executorService.shutdown();
/*
* Let the main thread wait till the executor service is terminated to
* observe the total time taken
*/
while (executorService.isTerminated() != true) {
}
long endTime = System.currentTimeMillis();
long timeTaken = endTime - startTime;
System.out.println("time taken.... " + timeTaken + " ms");
}
}