Гибкий CountDownLatch?
Я столкнулся с проблемой дважды, когда поток производителя создает N рабочих элементов, передает их в ExecutorService
, а затем должен ждать, пока не обработаются все N элементов.
Предостережение
- N заранее не известно. Если бы это было, я просто создал бы
CountDownLatch
, а затем создавал поток await()
, пока не завершится вся работа.
- Использование
CompletionService
неуместно, потому что хотя мой поток производителя должен блокировать (т.е. вызывая take()
), не указывать, что все работы завершены, чтобы поток производителя остановка ожидания.
Мое последнее предпочтительное решение - использовать целочисленный счетчик и приращение, когда каждый элемент работы отправляется, а декремент - при обработке рабочего элемента. После подбора всех задач N мой поток производителя должен будет ждать блокировки, проверяя, будет ли counter == 0
всякий раз, когда он уведомляется. Потребительский поток должен будет уведомить производителя, если он уменьшил счетчик, а новое значение равно 0.
Есть ли лучший подход к этой проблеме или есть подходящая конструкция в java.util.concurrent
, которую я должен использовать, а не "сворачивать свои собственные"?
Спасибо заранее.
Ответы
Ответ 1
java.util.concurrent.Phaser
похоже, что это сработает хорошо для вас. Планируется выпуск в Java 7, но наиболее стабильную версию можно найти на веб-сайте jsr166.
Фазер - это прославленный Циклический Барьер. Вы можете зарегистрировать N количество сторон и когда вы готовы дождаться своего продвижения на определенной фазе.
Быстрый пример того, как он будет работать:
final Phaser phaser = new Phaser();
public Runnable getRunnable(){
return new Runnable(){
public void run(){
..do stuff...
phaser.arriveAndDeregister();
}
};
}
public void doWork(){
phaser.register();//register self
for(int i=0 ; i < N; i++){
phaser.register(); // register this task prior to execution
executor.submit( getRunnable());
}
phaser.arriveAndAwaitAdvance();
}
Ответ 2
Конечно, вы можете использовать CountDownLatch
, защищенный AtomicReference
, чтобы ваши задачи были обернуты таким образом:
public class MyTask extends Runnable {
private final Runnable r;
public MyTask(Runnable r, AtomicReference<CountDownLatch> l) { this.r = r; }
public void run() {
r.run();
while (l.get() == null) Thread.sleep(1000L); //handle Interrupted
l.get().countDown();
}
}
Обратите внимание на, что задачи запускают свою работу, а затем вращаются до тех пор, пока не будет установлен обратный отсчет (т.е. общее количество заданий известно). Как только обратный отсчет установлен, они подсчитывают его и выходят. Они передаются следующим образом:
AtomicReference<CountDownLatch> l = new AtomicReference<CountDownLatch>();
executor.submit(new MyTask(r, l));
После точки создания/представления вашей работы, когда вы знаете, сколько задач вы создали:
latch.set(new CountDownLatch(nTasks));
latch.get().await();
Ответ 3
Я использовал ExecutorCompletionService для чего-то вроде этого:
ExecutorCompletionService executor = ...;
int count = 0;
while (...) {
executor.submit(new Processor());
count++;
}
//Now, pull the futures out of the queue:
for (int i = 0; i < count; i++) {
executor.take().get();
}
Это включает в себя сохранение очереди заданий, которые были отправлены, поэтому, если ваш список произвольно длинный, ваш метод может быть лучше.
Но не забудьте использовать AtomicInteger
для согласования, чтобы вы могли увеличивать его в одном потоке и уменьшать его в рабочих потоках.
Ответ 4
Я предполагаю, что вашему продюсеру не нужно знать, когда очередь пуста, но должна знать, когда последняя задача была завершена.
Я бы добавил к waitforWorkDone(producer)
метод waitforWorkDone(producer)
. Производитель может добавить свои N задачи и вызвать метод wait. Метод wait блокирует входящий поток, если рабочая очередь не пуста и в настоящий момент не выполняются задачи.
Потребительские потоки notifyAll()
в блокировке waitfor, если его задача завершена, очередь пуста и никакая другая задача не выполняется.
Ответ 5
Вы считали это:
https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ExecutorService.html#invokeAll(java.util.Collection)
Вот пример, который я написал:
https://github.com/moulaali/java_fun/blob/master/src/WaitForAllConcurrencyExample.java
Ответ 6
То, что вы описали, очень похоже на использование стандартного семафора, но используется "назад".
- Семафор начинается с 0 разрешений
- Каждый рабочий модуль освобождает одно разрешение, когда оно завершается
- Вы блокируете, ожидая получения разрешений N.
Кроме того, у вас есть возможность только приобрести M < N, что полезно, если вы хотите проверить промежуточное состояние. Например, я тестирую асинхронную ограниченную очередь сообщений, поэтому я ожидаю, что очередь будет заполнена для некоторого M < N, поэтому я могу получить M и проверить, действительно ли очередь заполнена, а затем получить оставшиеся разрешения N-M после того, как вы отправили сообщения из очереди.