Как я могу заставить команду ThreadPoolExecutor ждать, если слишком много данных, над которыми она должна работать?
Я получаю данные с сервера очереди, и мне нужно обработать его и отправить подтверждение. Что-то вроде этого:
while (true) {
queueserver.get.data
ThreadPoolExecutor //send data to thread
queueserver.acknowledgement
Я не совсем понимаю, что происходит в потоках, но я думаю, что эта программа получает данные, отправляет им поток и сразу же подтверждает это. Поэтому, даже если у меня есть предел каждой очереди, у вас может быть только 200 неподтвержденных предметов, она будет просто тянуть так же быстро, как она может ее получить. Это хорошо, когда я пишу программу на одном сервере, но если я использую несколько рабочих, это становится проблемой, потому что количество элементов в очереди потоков не является отражением выполненной работы, а вместо того, насколько быстро может получать элементы из сервера очереди.
Есть ли что-нибудь, что я могу сделать, чтобы заставить программу ждать, если очередь потоков заполнена?
Ответы
Ответ 1
Как заставить команду ThreadPoolExecutor ждать, если слишком много данных, над которыми она должна работать?
Я не уверен на 100%, что я понимаю вашу проблему здесь. Конечно, вместо очереди с открытым концом вы можете использовать BlockingQueue
с ограничением на нее:
BlockingQueue<Date> queue = new ArrayBlockingQueue<Date>(200);
В отношении заданий, представленных в ExecutorService
, вместо использования ExecutorService
по умолчанию, созданного с помощью Executors
, которые используют неограниченную очередь, вы можете создать свой собственный:
return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<Runnable>(200));
Как только очередь заполнится, она заставит ее отклонить любые новые задачи, которые будут отправлены. Вам нужно будет установить RejectedExecutionHandler
, который отправится в очередь. Что-то вроде:
final BlockingQueue queue = new ArrayBlockingQueue<Runnable>(200);
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS, queue);
// by default (unfortunately) the ThreadPoolExecutor will throw an exception
// when you submit the 201st job, to have it block you do:
threadPool.setRejectedExecutionHandler(new RejectedExecutionHandler() {
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
// this will block if the queue is full
executor.getQueue().put(r);
}
});
Я думаю, что большая ошибка в том, что Java не имеет ThreadPoolExecutor.CallerBlocksPolicy
.
Ответ 2
Если вы хотите получить подтверждение, когда работник начнет работу над заданием, вы можете создать пользовательский ThreadFactory
, который отправляет подтверждение из потока перед выполнением фактической работы. ИЛИ вы можете переопределить beforeExecute
для ThreadPoolExecutor
.
Если вы хотите получить подтверждение при освобождении нового работника для новой задачи, я думаю, вы можете инициализировать ThreadPoolExecutor
с помощью SynchronousQueue
и ThreadPoolExecutor.CallerRunsPolicy
, или собственной политикой, в которой блокирует вызывающий.
Ответ 3
во-первых, я думаю, что ваше отношение неверно, потому что то, что вы делали в своем псевдокоде, занято ожиданием, вы должны прочитать учебник Concurrency из java toturial http://docs.oracle.com/javase/tutorial/essential/concurrency/
игнорируя это, плохо предложите вам решение с оживленным ожиданием (которое не рекомендуется):
ExecutorService e1 = Executors.newFixedThreadPool(20);
while (true) {
if (!serverq.isEmpty() && !myq.isFull()) myq.enq(serverq.poll());
if (!myq.isEmpty()) e1.execute(myq.poll());
}
ПРИМЕЧАНИЯ:
1. Убедитесь, что ваш myq синхронизирован, как сказано в других ответах. вы можете расширить некоторую блокирующую очередь, чтобы убедиться в правильности синхронизации.
2.You реализует класс runnable, который делает то, что вы exepct с сервера в итерации
службы, эти runnables должны получить myq как параметр для конструктора и сохранить его как глобальную переменную.
3.myq получает runnables, что в конце его метода запуска вы должны убедиться, что runnable удаляет из myq.
Ответ 4
Как насчет того, чтобы иметь blockingPool, который не выполнит более 200 задач и дождитесь завершения задачи перед отправкой задания 201. Я достиг этого с помощью семафора в своем приложении. Вы также можете изменить предел, передав значение его конструктору.
Единственное отличие здесь от ответа @Gray заключается в том, что редко любая задача будет отвергнута в этом случае. Семафор выполнит любую 201 задачу, если другая задача не закончится. Тем не менее, у нас есть обработчик отклонения, чтобы повторно отправить эту задачу исполнителю в случае любого отклонения.
private class BlockingPool extends ThreadPoolExecutor {
private final Semaphore semaphore;
public BlockingPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, int tasksAllowedInThreads){
super(corePoolSize,maximumPoolSize,keepAliveTime,unit,workQueue,new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
executor.execute(r);
}
});
semaphore = new Semaphore(tasksAllowedInThreads);
}
@Override
public void execute(Runnable task){
boolean acquired = false;
do{
try{
semaphore.acquire();
acquired = true;
} catch (final InterruptedException e){
// log
}
} while (!acquired); // run in loop to handle InterruptedException
try{
super.execute(task);
} catch (final RejectedExecutionException e){
System.out.println("Task Rejected");
semaphore.release();
throw e;
}
}
@Override
protected void afterExecute(Runnable r, Throwable t){
super.afterExecute(r, t);
if (t != null){
t.printStackTrace();
}
semaphore.release();
}
}
Это имеет смысл!