Java concurrency шаблон для внешнего общего ресурса (смарт-карты)
У меня есть служба веб-сервера, где клиенты запрашивают вычисления смарт-карт и получают их результат.
Доступный номер смарт-карты может уменьшаться или увеличиваться во время работы сервера, например, я могу добавить или удалить физически смарт-карту от читателя (или многие другие события... например, исключение и т.д.).
![введите описание изображения здесь]()
Вычисление смарт-карт может занять некоторое время, поэтому мне нужно оптимизировать эти задания, чтобы использовать все доступные смарт-карты, если на веб-сервере есть параллельные запросы.
Я думал работать с пулом смарт-карт. Необычная вещь, по крайней мере для меня, заключается в том, что пул должен менять свой размер не в зависимости от клиентских запросов, а только от наличия смарт-карты.
![введите описание изображения здесь]()
Я изучил множество примеров:
- BlockingQueue. Хорошо выглядит, чтобы хранить запрос и останавливать поток, ожидая чего-то.
- FutureTask. Я могу использовать этот класс, чтобы клиент ожидал ответа, но какой excecutor должен выполнить эту задачу?
- ThreadPoolExecutor: Мне кажется, что мне нужно, но с этим я не могу изменить размер пула, причем каждый поток должен быть связан с одним слотом для смарт-карт. Это может быть решением, если я могу изменить размер пула (добавив поток при вставленной смарт-карте и удалив поток при удалении смарт-карты), и если я могу назначить конкретную смарт-карту для каждого потока.
Это управление смарт-картами, у меня есть один смарт-карт на смарт-карте, каждая смарт-карта имеет свой собственный номер слота.
public class SmartcardWrapper{
private int slot;
public SmartcardWrapper(int slot) {
this.slot=slot;
}
public byte[] compute(byte[] input) {
byte[] out=new byte[];
SmartcardApi.computerInput(slot,input,out); //Native method
return out;
}
}
Я попытался создать пул потоков с одним потоком на смарт-карту:
private class SmartcardThread extends Thread{
protected SmartcardWrapper sw;
public SmartcardThread(SmartcardWrapper sw){
this.sw=sw;
}
@Override
public void run() {
while(true){
byte[] input=queue.take();
byte output=sw.compute(input);
// I have to return back the output to the client
}
}
}
Все ждут чего-то в одной очереди ввода:
BlockingQueue<byte[]> queue=new BlockingQueue<byte[]>();
Но как вернуть обратный вывод из smartcard-thread на веб-сервер-клиент? Это позволяет мне думать, что BlockingQueue не является моим решением.
Как подойти к этой проблеме? Какую модель concurrency следует придерживаться?
правильно ли назначить один поток на смарт-карту или я просто могу использовать семафоры?
Ответы
Ответ 1
Ваше предположение:
ThreadPoolExecutor: Кажется, что мне нужно, но с этим я не могу изменить размер пула, причем каждый поток должен быть связан с одним слотом смарт-карты.
неверно.
You can set thread pool size dynamically.
Посмотрите ниже ThreadPoolExecutor API
public void setMaximumPoolSize(int maximumPoolSize)
Устанавливает максимально допустимое количество потоков. Это переопределяет любое значение, заданное в конструкторе. Если новое значение меньше текущего значения, избыточные существующие потоки будут прекращены, когда они станут свободными.
public void setCorePoolSize(int corePoolSize)
Устанавливает базовое число потоков. Это переопределяет любое значение, заданное в конструкторе. Если новое значение меньше текущего значения, избыточные существующие потоки будут прекращены, когда они станут свободными. Если больше, новые потоки при необходимости будут запущены для выполнения любых задач в очереди.
Core and maximum pool sizes:
A ThreadPoolExecutor
автоматически отрегулирует размер пула в соответствии с границами, установленными corePoolSize
и maximumPoolSize
.
Когда новая задача отправляется в методе execute(java.lang.Runnable)
и выполняется меньше потоков corePoolSize
, для обработки запроса создается новый поток, даже если другие рабочие потоки простаивают.
Если существует более чем corePoolSize
, но меньше, чем maximumPoolSize
потоков, новый поток будет создан только в том случае, если очередь заполнена.
Установив maximumPoolSize
на существенно неограниченное значение, такое как Integer.MAX_VALUE
, вы позволяете пулу вмещать произвольное количество одновременных задач. Но я бы не рекомендовал иметь такое количество потоков. Установите это значение с осторожностью.
Как правило, размеры ядра и максимального пула устанавливаются только при построении, но их также можно динамически изменять с помощью setCorePoolSize(int
) и setMaximumPoolSize(int)
.
EDIT:
Для лучшего использования пула потоков, если вы знаете, что максимальное количество карт составляет 6, вы можете использовать
ExecutorService executor = Executors.newFixedThreadPool(6);
ИЛИ
Ответ 2
Рассматривали ли вы вообще Apache Commons Pool?
Вам необходимо поддерживать пул объектов SmartcardWrapper, где каждый SmartcardWrapper будет представлять собой физическую смарт-карту. Всякий раз, когда вам нужно сделать новое вычисление, вы берете объект из пула, делаете расчет и возвращаете объект в пул, чтобы его можно было повторно использовать в следующем потоке.
Сам пул является потокобезопасным и блокирует, когда нет доступных объектов. Все, что вам нужно сделать, это реализовать api для добавления/удаления объектов SmartcardWrapper в пул.
Ответ 3
Возможно, я нашел разумное простое решение, основанное на следующих предположениях:
- отдельный процесс обрабатывает (системные события) уведомления для смарт-карт, которые становятся доступными или удаляются.
- Клиент не заботится о том, какую смарт-карту он может использовать, если он может использовать один без помех.
Эти два предположения фактически облегчают создание решения для объединения (разделяемые ресурсы), так как обычно сам пул отвечает за создание и удаление ресурсов, когда это необходимо. Без этой функции упрощается решение для объединения. Я предполагаю, что клиент, который получает смарт-карту из пула для использования, может выполнять необходимые функции смарт-карты в своем собственном потоке выполнения (подобно тому, как соединение с базой данных используется из пула соединений с базой данных для запроса данных из базы данных).
Я выполнил минимальное тестирование для двух классов, показанных ниже, и я боюсь, что основная часть работы заключается в письменных (единичных) тестах, которые доказывают, что пул работает правильно с одновременными клиентскими запросами в сочетании с добавлением и удалением смарт-карты Ресурсы. Если вы не хотите этого делать, то ответ от пользователя769771, вероятно, является лучшим решением. Но если вы это сделаете, попробуйте, посмотрите, подходит ли она. Идея заключается в том, что только один экземпляр пула ресурсов создается и используется всеми клиентами и обновляется отдельным процессом, который управляет доступностью смарт-карт.
import java.util.*;
import java.util.concurrent.*;
/**
* A resource pool that expects shared resources
* to be added and removed from the pool by an external process
* (i.e. not done by the pool itself, see {@link #add(Object)} and {@link #remove(Object)}.
* <br>A {@link ResourcePoolValidator} can optionally be used.
* @param <T> resource type handed out by the pool.
*/
public class ResourcePool<T> {
private final Set<T> registered = Collections.newSetFromMap(new ConcurrentHashMap<T, Boolean>());
/* Use a linked list as FIFO queue for resources to lease. */
private final List<T> available = Collections.synchronizedList(new LinkedList<T>());
private final Semaphore availableLock = new Semaphore(0, true);
private final ResourcePoolValidator<T> validator;
public ResourcePool() {
this(null);
}
public ResourcePool(ResourcePoolValidator<T> validator) {
super();
this.validator = validator;
}
/**
* Add a resource to the pool.
* @return true if resource is not already in the pool.
*/
public synchronized boolean add(T resource) {
boolean added = false;
if (!registered.contains(resource)) {
registered.add(resource);
available.add(resource);
availableLock.release();
added = true;
}
return added;
}
/**
* Removes a resource from the pool.
* The resource might be in use (see {@link #isLeased(Object)})
* in which case {@link ResourcePoolValidator#abandoned(Object)} will be called
* when the resource is no longer used (i.e. released).
* @return true if resource was part of the pool and removed from the pool.
*/
public synchronized boolean remove(T resource) {
// method is synchronized to prevent multiple threads calling add and remove at the same time
// which could in turn bring the pool in an invalid state.
return registered.remove(resource);
}
/**
* If the given resource is (or was, see also {@link #remove(Object)} part of the pool,
* a returned value true indicates the resource is in use / checked out.
* <br>This is a relative expensive method, do not call it frequently.
*/
public boolean isLeased(T resource) {
return !available.contains(resource);
}
/**
* Try to get a shared resource for usage.
* If a resource is acquired, it must be {@link #release(Object)}d in a finally-block.
* @return A resource that can be exclusively used by the caller.
* @throws InterruptedException When acquiring a resource is interrupted.
* @throws TimeoutException When a resource is not available within the given timeout period.
*/
public T tryAcquire(long timeout, TimeUnit tunit) throws InterruptedException, TimeoutException {
T resource = null;
long timeRemaining = tunit.toMillis(timeout);
final long tend = System.currentTimeMillis() + timeRemaining;
do {
if (availableLock.tryAcquire(timeRemaining, TimeUnit.MILLISECONDS)) {
resource = available.remove(0);
if (registered.contains(resource)) {
boolean valid = false;
try {
valid = (validator == null ? true : validator.isValid(resource));
} catch (Exception e) {
// TODO: log exception
e.printStackTrace();
}
if (valid) {
break; // return the "checked out" resource
} else {
// remove invalid resource from pool
registered.remove(resource);
if (validator != null) {
validator.abandoned(resource);
}
}
}
// resource was removed from pool, try acquire again
// note that this implicitly lowers the maximum available resources
// (an acquired permit from availableLock goes unused).
// TODO: retry puts us at the back of availableLock queue but should put us at the front of the queue
resource = null;
}
timeRemaining = tend - System.currentTimeMillis();
} while (timeRemaining > 0L);
if (resource == null) {
throw new TimeoutException("Unable to acquire a resource within " + tunit.toMillis(timeout) + " ms.");
}
return resource;
}
/**
* This method must be called by the caller / client whenever {@link #tryAcquire(long, TimeUnit)}
* has returned a resource. If the caller has determined the resource is no longer valid,
* the caller should call {@link #remove(Object)} before calling this method.
* @param resource no longer used.
*/
public void release(T resource) {
if (resource == null) {
return;
}
if (registered.contains(resource)) {
available.add(resource);
availableLock.release();
} else {
if (validator != null) {
validator.abandoned(resource);
}
}
}
/** An array (copy) of all resources registered in the pool. */
@SuppressWarnings("unchecked")
public T[] getRegisteredResources() {
return (T[]) registered.toArray(new Object[registered.size()]);
}
}
И отдельный класс с функциями, связанными с отдельным процессом, который управляет доступностью smarcard.
import java.util.concurrent.TimeUnit;
/**
* Used by a {@link ResourcePool} to validate a resource before handing it out for lease
* (see {@link #isValid(Object)} and signal a resource is no longer used (see {@link #abandoned(Object)}).
*/
public class ResourcePoolValidator<T> {
/**
* Overload this method (this method does nothing by default)
* to validate a resource before handing it out for lease.
* If this method returns false or throws an exception (which it preferably should not do),
* the resource is removed from the pool.
* @return true if the resource is valid for leasing
*/
public boolean isValid(T resource) {
return true;
}
/**
* Called by the {@link ResourcePool#release(Object)} method when a resource is released by a caller
* but the resource was previously removed from the pool and in use.
* <br>Called by {@link ResourcePool#tryAcquire(long, TimeUnit)} if a resource if not valid
* (see {@link #isValid(Object)}.
* <br>Overload this method (this method does nothing by default) to create a notification of an unused resource,
* do NOT do any long period of processing as this method is called from a caller (client) thread.
*/
public void abandoned(T resource) {
// NO-OP
}
}
Ответ 4
Изучив требования, лучшая архитектура будет заключаться в том, чтобы отделить вычисление смарт-карты от ваших веб-сервисов.
Полагаться на веб-службы, чтобы ждать от задач с интенсивным процессором, приведет к таймаутам.
Лучшим решением является предварительная интеллектуальная карта, использующая периодическое задание и сохраняющая эти слоты, пар вычислений на сервере кэширования, например Redis.
![введите описание изображения здесь]()
Задание синхронизатора смарт-карт представляет собой отдельное приложение J2SE Stand Alone, которое периодически проверяет, какая смарт-карта доступна и активна (без ошибок), и обновляет Redis Cache слотом и вычисляет в качестве пары "ключ/значение". Если смарт-карта недоступна, она будет удалена из кеша.
Веб-служба просто проверит кеш Redis для определенного ключа слота, и если он найдет значение, оно вернет его или вернет не найденное для этого слота (недоступно или ошибка)
Эта конструкция масштабируется как в конце смарт-карты, так и в конце клиентских запросов.
Ответ 5
В ответ на ваш вопрос о том, как вернуть результат обратно вызывающему:
Все ждут чего-то в одной очереди ввода:
BlockingQueue queue = new BlockingQueue();
Но как вернуть обратный вывод из smartcard-thread в Веб-сервер-клиент? Это позволяет мне думать, что BlockingQueue не мой Решение.
Идея очереди отправки в основном прекрасна, но вам также нужна очередь в потоке, чтобы вернуть результат в адрес отправителя задания...
Измените очередь отправки на:
BlockingQueue<JobSubmitRec> queue=new BlockingQueue<JobSubmitRec>();
и JobSubmitRec будет иметь байт [] и одноразовую очередь для возврата результата:
class JobSubmitRec
{
byte[] data;
BlockingQueue<JobSubmitResult> result=new LinkedBlockingQueue<JobSubmitResult>();
}
и ваш рабочий поток Thread будет выглядеть примерно так:
public void run() {
while(true){
JobSubmitRec submitrec = queue.take();
byte[] input = submitrec.data;
byte output = sw.compute(input);
submitrec.result.put( new JobSubmitResult(output) );
}
}
и клиент, который отправит задание, будет выглядеть так:
JobSubmitRec jsr = new JobSubmitRec( data );
queue.put( jsr );
JobSubmitResult result = jsr.result.take();
// use result here