Исполнители Java: как установить приоритет задачи?
Есть ли возможность установить приоритет для задач, выполняемых исполнителями? Я нашел некоторые утверждения в JCIP об этом, но я не могу найти какой-либо пример, и я не могу найти ничего связанного в документах.
Из JCIP:
Политика выполнения определяет "что, где, когда и как" задачи выполнение, в том числе:
- ...
- В каком порядке должны выполняться задачи (FIFO, LIFO, приоритетный порядок)?
- ...
UPD: Я понял, что я спросил не то, что я хотел спросить. Я действительно хотел:
Как использовать/эмулировать приоритет потоков нитей (т.е. что было thread.setPriority()
) с картой исполнителей?
Ответы
Ответ 1
В настоящее время единственными конкретными реализациями интерфейса Executor являются ThreadPoolExecutor и ScheduledThreadpoolExecutor
Вместо использования утилиты / factory класса Executors вы должны создать экземпляр, используя конструктор.
Вы можете передать BlockingQueue конструкторам ThreadPoolExecutor.
Одна из реализаций BlockingQueue, PriorityBlockingQueue позволяет передавать Comparator в конструктор, таким образом позволяя вам определить порядок выполнение.
Ответ 2
Идея здесь заключается в использовании PriorityBlockingQueue в исполнителе. Для этого:
- Создайте компаратор, который бы сравнивал наши фьючерсы.
- Создайте прокси для будущего, чтобы сохранить приоритет.
- Переопределите 'newTaskFor', чтобы обернуть каждое будущее в нашем прокси.
Сначала вам нужно сохранить приоритет в своем будущем:
class PriorityFuture<T> implements RunnableFuture<T> {
private RunnableFuture<T> src;
private int priority;
public PriorityFuture(RunnableFuture<T> other, int priority) {
this.src = other;
this.priority = priority;
}
public int getPriority() {
return priority;
}
public boolean cancel(boolean mayInterruptIfRunning) {
return src.cancel(mayInterruptIfRunning);
}
public boolean isCancelled() {
return src.isCancelled();
}
public boolean isDone() {
return src.isDone();
}
public T get() throws InterruptedException, ExecutionException {
return src.get();
}
public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
return src.get();
}
public void run() {
src.run();
}
}
Далее вам нужно определить компаратор, который правильно сортировал бы приоритетные фьючерсы:
class PriorityFutureComparator implements Comparator<Runnable> {
public int compare(Runnable o1, Runnable o2) {
if (o1 == null && o2 == null)
return 0;
else if (o1 == null)
return -1;
else if (o2 == null)
return 1;
else {
int p1 = ((PriorityFuture<?>) o1).getPriority();
int p2 = ((PriorityFuture<?>) o2).getPriority();
return p1 > p2 ? 1 : (p1 == p2 ? 0 : -1);
}
}
}
Затем предположим, что у нас есть такая длинная работа, как это:
class LenthyJob implements Callable<Long> {
private int priority;
public LenthyJob(int priority) {
this.priority = priority;
}
public Long call() throws Exception {
System.out.println("Executing: " + priority);
long num = 1000000;
for (int i = 0; i < 1000000; i++) {
num *= Math.random() * 1000;
num /= Math.random() * 1000;
if (num == 0)
num = 1000000;
}
return num;
}
public int getPriority() {
return priority;
}
}
Затем для выполнения этих заданий приоритет будет выглядеть так:
public class TestPQ {
public static void main(String[] args) throws InterruptedException, ExecutionException {
int nThreads = 2;
int qInitialSize = 10;
ExecutorService exec = new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS,
new PriorityBlockingQueue<Runnable>(qInitialSize, new PriorityFutureComparator())) {
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
RunnableFuture<T> newTaskFor = super.newTaskFor(callable);
return new PriorityFuture<T>(newTaskFor, ((LenthyJob) callable).getPriority());
}
};
for (int i = 0; i < 20; i++) {
int priority = (int) (Math.random() * 100);
System.out.println("Scheduling: " + priority);
LenthyJob job = new LenthyJob(priority);
exec.submit(job);
}
}
}
Это много кода, но это почти единственный способ сделать это.
На моей машине вывод выглядит следующим образом:
Scheduling: 39
Scheduling: 90
Scheduling: 88
Executing: 39
Scheduling: 75
Executing: 90
Scheduling: 15
Scheduling: 2
Scheduling: 5
Scheduling: 24
Scheduling: 82
Scheduling: 81
Scheduling: 3
Scheduling: 23
Scheduling: 7
Scheduling: 40
Scheduling: 77
Scheduling: 49
Scheduling: 34
Scheduling: 22
Scheduling: 97
Scheduling: 33
Executing: 2
Executing: 3
Executing: 5
Executing: 7
Executing: 15
Executing: 22
Executing: 23
Executing: 24
Executing: 33
Executing: 34
Executing: 40
Executing: 49
Executing: 75
Executing: 77
Executing: 81
Executing: 82
Executing: 88
Executing: 97
Ответ 3
вы можете использовать ThreadPoolExecutor с приоритетной очередью блокировки
Как реализовать PriorityBlockingQueue с помощью ThreadPoolExecutor и настраиваемых задач
Ответ 4
Вы можете указать ThreadFactory
в конструкторе ThreadPoolExecutor
(или Executors
factory). Это позволяет вам предоставлять потоки заданного приоритета потока для исполнителя.
Чтобы получать разные приоритеты потоков для разных заданий, вам нужно отправить их исполнителям с различными фабриками потоков.
Ответ 5
Помните, что setPriority (..) обычно работает не под Linux. Для получения более подробной информации см. Следующие ссылки:
Ответ 6
Просто хочу добавить свой вклад в эту дискуссию. Я реализовал этот ReorderingThreadPoolExecutor для очень конкретной цели, которая может явно привести к фронту исполнителя BlockingQueue (в этом case LinkedBlockingDeque), когда я захочу и не буду иметь дело с приоритетами (что может привести к взаимоблокировкам и, во всяком случае, исправлено).
Я использую это для управления (внутри приложения для Android), когда мне нужно загрузить много изображений, отображаемых в виде длинного списка. Всякий раз, когда пользователь быстро прокручивается, очередь исполнителя заполняется запросами загрузки изображений: путем перемещения последних в верхней части очереди я добился гораздо более высоких результатов при загрузке изображений, которые на самом деле находятся на экране, задерживая загрузку те, которые, вероятно, понадобятся позже. Обратите внимание, что я использую внутренний параллельный ключ карты (который может быть так же прост, как строка URL-адреса изображения), чтобы добавить задачи к исполнителям, чтобы я мог их позднее загрузить для переупорядочения.
Было бы много других способов сделать то же самое и, может быть, слишком сложно, но он отлично работает, а Facebook в его Android SDK делает что-то подобное в своей собственной очереди рабочих потоков.
Не стесняйтесь взглянуть на код и дать мне предложения, в рамках проекта Android, но удаление нескольких журналов и аннотаций сделает класс чистым Java 6.
Ответ 7
Вы можете реализовать свой собственный ThreadFactory и установить его в ThreadPoolExecutor следующим образом:
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(1, numOfWorkerThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
threadPool.setThreadFactory(new OpJobThreadFactory(Thread.NORM_PRIORITY-2));
где моя OpJobThreadFactory выглядит следующим образом:
public final static class OpJobThreadFactory implements ThreadFactory {
private int priority;
private boolean daemon;
private final String namePrefix;
private static final AtomicInteger poolNumber = new AtomicInteger(1);
private final AtomicInteger threadNumber = new AtomicInteger(1);
public OpJobThreadFactory(int priority) {
this(priority, true);
}
public OpJobThreadFactory(int priority, boolean daemon) {
this.priority = priority;
this.daemon = daemon;
namePrefix = "jobpool-" +poolNumber.getAndIncrement() + "-thread-";
}
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r, namePrefix + threadNumber.getAndIncrement());
t.setDaemon(daemon);
t.setPriority(priority);
return t;
}
}