Контроль выполнения выполнения задачи с помощью ExecutorService
У меня есть процесс, который делегирует асинхронные задачи пулу потоков. Мне нужно обеспечить выполнение определенных задач по порядку.
Так, например,
Задачи приходят в порядок
Задачи a1, b1, c1, d1, e1, a2, a3, b2, f1
Задачи могут выполняться в любом порядке, за исключением случаев, когда существует естественная зависимость, поэтому a1, a2, a3 должны обрабатываться в этом порядке либо распределением на один поток, либо блокировкой их до тех пор, пока я не узнаю, что предыдущая задача была завершена.
В настоящее время он не использует пакет Java Concurrency, но я рассматриваю возможность изменения, чтобы воспользоваться поддержкой управления потоками.
Есть ли у кого-нибудь подобное решение или предложения о том, как достичь этого?
Ответы
Ответ 1
Когда я делал это раньше, у меня обычно был заказ, обработанный компонентом, который затем отправляет вызывающие/исполняемые файлы в Исполнитель.
Что-то вроде.
- Получил список задач для запуска, некоторые с зависимостями
- Создайте Исполнителя и заверните с помощью службы-исполнителя.
- Поиск всех задач без каких-либо зависимостей, планирование их через службу завершения
- Опросить службу завершения
- По мере выполнения каждой задачи
- Добавьте его в "завершенный" список
- Переоцените любые ожидающие задачи по "завершенному списку", чтобы узнать, являются ли они "завершением зависимости". Если так планировать их
- Промывка повторяется до тех пор, пока все задания не будут отправлены/завершены.
Служба завершения - это отличный способ получить задачи по мере их завершения, а не пытаться опросить кучу фьючерсов. Однако вы, вероятно, захотите сохранить Map<Future, TaskIdentifier>
, который заполняется, когда задание выполняется по расписанию через службу завершения, так что, когда служба завершения предоставит вам завершенное Будущее, вы можете выяснить, что это за TaskIdentifier
.
Если вы когда-либо находите себя в состоянии, когда задачи все еще ждут запуска, но ничего не выполняется, и ничто не может быть запланировано, тогда у вас есть проблема с круговой зависимостью.
Ответ 2
Я пишу собственный Executor, который гарантирует заказ задачи для задач с одним и тем же ключом. Он использует карту очередей для задач заказа с одним и тем же ключом. Каждая задача с ключом выполняет следующую задачу с тем же ключом.
Это решение не обрабатывает исключение RejectedExecutionException или другие исключения из делегированного Исполнителя! Поэтому делегированный Исполнитель должен быть "неограниченным".
import java.util.HashMap;
import java.util.LinkedList;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.Executor;
/**
* This Executor warrants task ordering for tasks with same key (key have to implement hashCode and equal methods correctly).
*/
public class OrderingExecutor implements Executor{
private final Executor delegate;
private final Map<Object, Queue<Runnable>> keyedTasks = new HashMap<Object, Queue<Runnable>>();
public OrderingExecutor(Executor delegate){
this.delegate = delegate;
}
@Override
public void execute(Runnable task) {
// task without key can be executed immediately
delegate.execute(task);
}
public void execute(Runnable task, Object key) {
if (key == null){ // if key is null, execute without ordering
execute(task);
return;
}
boolean first;
Runnable wrappedTask;
synchronized (keyedTasks){
Queue<Runnable> dependencyQueue = keyedTasks.get(key);
first = (dependencyQueue == null);
if (dependencyQueue == null){
dependencyQueue = new LinkedList<Runnable>();
keyedTasks.put(key, dependencyQueue);
}
wrappedTask = wrap(task, dependencyQueue, key);
if (!first)
dependencyQueue.add(wrappedTask);
}
// execute method can block, call it outside synchronize block
if (first)
delegate.execute(wrappedTask);
}
private Runnable wrap(Runnable task, Queue<Runnable> dependencyQueue, Object key) {
return new OrderedTask(task, dependencyQueue, key);
}
class OrderedTask implements Runnable{
private final Queue<Runnable> dependencyQueue;
private final Runnable task;
private final Object key;
public OrderedTask(Runnable task, Queue<Runnable> dependencyQueue, Object key) {
this.task = task;
this.dependencyQueue = dependencyQueue;
this.key = key;
}
@Override
public void run() {
try{
task.run();
} finally {
Runnable nextTask = null;
synchronized (keyedTasks){
if (dependencyQueue.isEmpty()){
keyedTasks.remove(key);
}else{
nextTask = dependencyQueue.poll();
}
}
if (nextTask!=null)
delegate.execute(nextTask);
}
}
}
}
Ответ 3
При отправке Runnable
или Callable
в ExecutorService
вы получите Future
в ответ. Пусть потоки, которые зависят от a1, передаются a1 Future
и вызывают Future.get()
. Это будет заблокировано до тех пор, пока поток не завершится.
Итак:
ExecutorService exec = Executor.newFixedThreadPool(5);
Runnable a1 = ...
final Future f1 = exec.submit(a1);
Runnable a2 = new Runnable() {
@Override
public void run() {
f1.get();
... // do stuff
}
}
exec.submit(a2);
и т.д.
Ответ 4
Другой вариант - создать собственного исполнителя, вызвать его OrderedExecutor и создать массив инкапсулированных объектов ThreadPoolExecutor с 1 потоком на внутренний исполнитель. Затем вы предоставляете механизм для выбора одного из внутренних объектов, например, вы можете сделать это, предоставив интерфейс, который может реализовать пользователь вашего класса:
executor = new OrderedExecutor( 10 /* pool size */, new OrderedExecutor.Chooser() {
public int choose( Runnable runnable ) {
MyRunnable myRunnable = (MyRunnable)runnable;
return myRunnable.someId();
});
executor.execute( new MyRunnable() );
Реализация OrderedExecutor.execute() затем будет использовать Chooser для получения int, вы измените это с размером пула и указате свой индекс во внутренний массив. Идея заключается в том, что "someId()" вернет одинаковое значение для всех "a" и т.д.
Ответ 5
Вы можете использовать Executors.newSingleThreadExecutor(), но он будет использовать только один поток для выполнения ваших задач. Другой вариант - использовать CountDownLatch. Вот простой пример:
public class Main2 {
public static void main(String[] args) throws InterruptedException {
final CountDownLatch cdl1 = new CountDownLatch(1);
final CountDownLatch cdl2 = new CountDownLatch(1);
final CountDownLatch cdl3 = new CountDownLatch(1);
List<Runnable> list = new ArrayList<Runnable>();
list.add(new Runnable() {
public void run() {
System.out.println("Task 1");
// inform that task 1 is finished
cdl1.countDown();
}
});
list.add(new Runnable() {
public void run() {
// wait until task 1 is finished
try {
cdl1.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Task 2");
// inform that task 2 is finished
cdl2.countDown();
}
});
list.add(new Runnable() {
public void run() {
// wait until task 2 is finished
try {
cdl2.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Task 3");
// inform that task 3 is finished
cdl3.countDown();
}
});
ExecutorService es = Executors.newFixedThreadPool(200);
for (int i = 0; i < 3; i++) {
es.submit(list.get(i));
}
es.shutdown();
es.awaitTermination(1, TimeUnit.MINUTES);
}
}
Ответ 6
В библиотеке Habanero-Java существует концепция задач, управляемых данными, которые могут использоваться для выражения зависимостей между задачами и предотвращения потоков, блокирующие операции. В оболочках библиотеки Habanero-Java используется JDKs ForkJoinPool (т.е. ExecutorService).
Например, ваш прецедент для задач A1, A2, A3,... может быть выражен следующим образом:
HjFuture a1 = future(() -> { doA1(); return true; });
HjFuture a2 = futureAwait(a1, () -> { doA2(); return true; });
HjFuture a3 = futureAwait(a2, () -> { doA3(); return true; });
Обратите внимание, что a1, a2 и a3 являются просто ссылками на объекты типа HjFuture и могут поддерживаться в ваших настраиваемых структурах данных, чтобы указать зависимости как и когда задачи A2 и A3 входят во время выполнения.
Есть некоторые учебные слайды, доступные.
Вы можете найти дополнительную документацию как javadoc, резюме API и primers.
Ответ 7
Я создал OrderingExecutor для этой проблемы. Если вы передадите один и тот же ключ методу execute() с разными runnables, выполнение runnables с тем же ключом будет в порядке вызова execute() и никогда не будет перекрываться.
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
/**
* Special executor which can order the tasks if a common key is given.
* Runnables submitted with non-null key will guaranteed to run in order for the same key.
*
*/
public class OrderedExecutor {
private static final Queue<Runnable> EMPTY_QUEUE = new QueueWithHashCodeAndEquals<Runnable>(
new ConcurrentLinkedQueue<Runnable>());
private ConcurrentMap<Object, Queue<Runnable>> taskMap = new ConcurrentHashMap<Object, Queue<Runnable>>();
private Executor delegate;
private volatile boolean stopped;
public OrderedExecutor(Executor delegate) {
this.delegate = delegate;
}
public void execute(Runnable runnable, Object key) {
if (stopped) {
return;
}
if (key == null) {
delegate.execute(runnable);
return;
}
Queue<Runnable> queueForKey = taskMap.computeIfPresent(key, (k, v) -> {
v.add(runnable);
return v;
});
if (queueForKey == null) {
// There was no running task with this key
Queue<Runnable> newQ = new QueueWithHashCodeAndEquals<Runnable>(new ConcurrentLinkedQueue<Runnable>());
newQ.add(runnable);
// Use putIfAbsent because this execute() method can be called concurrently as well
queueForKey = taskMap.putIfAbsent(key, newQ);
if (queueForKey != null)
queueForKey.add(runnable);
delegate.execute(new InternalRunnable(key));
}
}
public void shutdown() {
stopped = true;
taskMap.clear();
}
/**
* Own Runnable used by OrderedExecutor.
* The runnable is associated with a specific key - the Queue<Runnable> for this
* key is polled.
* If the queue is empty, it tries to remove the queue from taskMap.
*
*/
private class InternalRunnable implements Runnable {
private Object key;
public InternalRunnable(Object key) {
this.key = key;
}
@Override
public void run() {
while (true) {
// There must be at least one task now
Runnable r = taskMap.get(key).poll();
while (r != null) {
r.run();
r = taskMap.get(key).poll();
}
// The queue emptied
// Remove from the map if and only if the queue is really empty
boolean removed = taskMap.remove(key, EMPTY_QUEUE);
if (removed) {
// The queue has been removed from the map,
// if a new task arrives with the same key, a new InternalRunnable
// will be created
break;
} // If the queue has not been removed from the map it means that someone put a task into it
// so we can safely continue the loop
}
}
}
/**
* Special Queue implementation, with equals() and hashCode() methods.
* By default, Java SE queues use identity equals() and default hashCode() methods.
* This implementation uses Arrays.equals(Queue::toArray()) and Arrays.hashCode(Queue::toArray()).
*
* @param <E> The type of elements in the queue.
*/
private static class QueueWithHashCodeAndEquals<E> implements Queue<E> {
private Queue<E> delegate;
public QueueWithHashCodeAndEquals(Queue<E> delegate) {
this.delegate = delegate;
}
public boolean add(E e) {
return delegate.add(e);
}
public boolean offer(E e) {
return delegate.offer(e);
}
public int size() {
return delegate.size();
}
public boolean isEmpty() {
return delegate.isEmpty();
}
public boolean contains(Object o) {
return delegate.contains(o);
}
public E remove() {
return delegate.remove();
}
public E poll() {
return delegate.poll();
}
public E element() {
return delegate.element();
}
public Iterator<E> iterator() {
return delegate.iterator();
}
public E peek() {
return delegate.peek();
}
public Object[] toArray() {
return delegate.toArray();
}
public <T> T[] toArray(T[] a) {
return delegate.toArray(a);
}
public boolean remove(Object o) {
return delegate.remove(o);
}
public boolean containsAll(Collection<?> c) {
return delegate.containsAll(c);
}
public boolean addAll(Collection<? extends E> c) {
return delegate.addAll(c);
}
public boolean removeAll(Collection<?> c) {
return delegate.removeAll(c);
}
public boolean retainAll(Collection<?> c) {
return delegate.retainAll(c);
}
public void clear() {
delegate.clear();
}
@Override
public boolean equals(Object obj) {
if (!(obj instanceof QueueWithHashCodeAndEquals)) {
return false;
}
QueueWithHashCodeAndEquals<?> other = (QueueWithHashCodeAndEquals<?>) obj;
return Arrays.equals(toArray(), other.toArray());
}
@Override
public int hashCode() {
return Arrays.hashCode(toArray());
}
}
}
Ответ 8
Я написал свою службу победителя, которая известна в последовательности. Он упорядочивает задачи, которые содержат определенную ссылку и текущий поток.
Вы можете выполнить реализацию на https://github.com/nenapu/SequenceAwareExecutorService