Java: учебники/объяснения jsr166y Phaser
Этот вопрос был задан два года назад, но ресурсы, о которых он упоминает, либо не очень полезны (IMHO), либо ссылки более недействительны.
Там должны быть хорошие учебники, чтобы понять Phaser
. Я прочитал javadoc, но мои глаза затуманиваются, так как для того, чтобы действительно понять javadoc, вам нужно знать, как эти классы должны использоваться.
У кого-нибудь есть предложения?
Ответы
Ответ 1
Для Фазера я ответил на несколько вопросов. Видение их может помочь в понимании их приложений. Они связаны внизу. Но чтобы понять, что делает Фейзер и почему его полезно знать, что он решает.
Вот атрибуты CountdownLatch и CyclicBarrier
Примечание:
- Количество сторон - это еще один способ сказать # разных потоков.
- Не многократно использовать, вам нужно создать новый экземпляр
барьер перед повторным использованием.
- Препятствие возможно, если поток может прибыть и продолжить работу
не ожидая других или может дождаться завершения всех потоков.
CountdownLatch
- Исправлено количество сторон
- Невозможен
- Продвинутый (смотрите
latch.countDown();
с поддержкой
latch.await();
должен ждать)
CyclicBarrier
- Исправлено количество сторон
- Многоразовые
- Недоступно
Таким образом, CountdownLatch не может использоваться повторно, вы должны каждый раз создавать новый экземпляр, но он доступен. CylicBarrier может использоваться повторно, но все потоки должны ждать, пока каждая сторона не достигнет барьера.
Phaser
- Динамическое количество сторон
- Многоразовые
- выдвигаться
Когда поток хочет быть известным Phaser, они вызывают phaser.register()
, когда поток приходит к барьеру, который они вызывают phaser.arrive()
, и вот где он продвигается. Если поток хочет дождаться завершения всех зарегистрированных задач phaser.arriveAndAwaitAdvance()
Существует также концепция фазы, в которой потоки могут ждать по завершении других операций, которые могут не завершиться. Как только все потоки приходят к фазерному барьеру, создается новая фаза (прирост единицы).
Вы можете взглянуть на мои другие ответы, возможно, это поможет:
Java ExecutorService: waitTermination всех рекурсивно созданных задач
Гибкий CountDownLatch?
Ответ 2
Для Phaser
по крайней мере, я думаю, JavaDoc предлагает довольно четкое объяснение. Это класс, который вы бы использовали для синхронизации партии потоков в том смысле, что вы можете зарегистрировать каждый поток в пакете с помощью Phaser
, а затем использовать Phaser
, чтобы они блокировались до тех пор, пока каждый поток в пакете не будет уведомил Phaser
, после чего начнется выполнение любого заблокированного потока (-ов). Этот цикл ожидания/уведомления может повторяться снова и снова, по желанию/обязательно.
Их пример кода дает разумный пример (хотя мне очень не нравится их стиль с отступом в 2 символа):
void runTasks(List<Runnable> tasks) {
final Phaser phaser = new Phaser(1); // "1" to register self
// create and start threads
for (final Runnable task : tasks) {
phaser.register();
new Thread() {
public void run() {
phaser.arriveAndAwaitAdvance(); // await all creation
task.run();
}
}.start();
}
// allow threads to start and deregister self
phaser.arriveAndDeregister();
}
Это устанавливает Phaser
с регистрационным счетом tasks.size() + 1
, и для каждой задачи создается новый Thread
, который будет блокироваться до следующего продвижения Phaser
(т.е. времени, в которое tasks.size() + 1
записи были записаны), а затем выполнить связанную с этим задачу. Каждый созданный Thread
также мгновенно запускается, поэтому Phaser
выходит из цикла с записью tasks.size()
.
Последний вызов phaser.arriveAndDeregister()
будет записывать окончательное прибытие, а также уменьшает счет регистрации, чтобы теперь он равнялся tasks.size()
. Это приводит к ускорению Phaser
, что фактически позволяет запускать все задачи одновременно. Это можно повторить, выполнив что-то вроде:
void runTasks(List<Runnable> tasks) {
final Phaser phaser = new Phaser(1); // "1" to register self
// create and start threads
for (final Runnable task : tasks) {
phaser.register();
new Thread() {
public void run() {
while (true) {
phaser.arriveAndAwaitAdvance(); // await all creation
task.run();
}
}
}.start();
}
// allow threads to start and deregister self
phaser.arriveAndDeregister();
}
... это то же самое, что и раньше, за исключением добавления цикла, который заставляет задачу запускаться повторно. Поскольку каждая итерация вызывает phaser.arriveAndAwaitAdvance()
, выполнение потоков задач будет синхронизировано таким образом, что task-0 не начнет свою вторую итерацию до тех пор, пока каждая другая задача не завершит свою первую итерацию и не сообщит, что Phaser
, которая готова начать свою вторую итерация.
Это может быть полезно, если выполняемые вами задачи сильно различаются в зависимости от времени, которое требуется выполнить, и если вы хотите, чтобы более быстрые потоки не синхронизировались с более медленными.
Для возможного реального приложения рассмотрите игру, в которой выполняются отдельные графики и физические потоки. Вы не хотите, чтобы данные физического потока вычислялись для кадра 100, если графический поток застревает в кадре 6, а использование Phaser
- это один из возможных подходов к обеспечению того, чтобы потоки графики и физики всегда работали на одном и том же кадре в то же время (а также то, что если один поток значительно медленнее другого, более быстрый поток изящно дает ресурсы ЦП, так что, надеюсь, более медленный поток может ускориться).
Ответ 3
Phaser несколько похож на функциональность CyclicBarrier и CountDownLatch, но он обеспечивает большую гибкость, чем обе из них.
В CyclicBarrier мы использовали регистрацию сторон в конструкторе, но Phaser предоставляет нам гибкость регистрации и деривации сторон в любое время.
Для регистрации сторон мы можем использовать любое из следующих -
-
- Конструкторы
- зарегистрируйтесь или
- bulkRegister
Для сторон deRegistering мы можем использовать -
-
- receiveAndDeregister, или
register -
Добавляет/Регистрирует новую неуправляемую сторону этого фазера. Он возвращает
- номер фазы прибытия, к которой применяется эта регистрация.
- Если фазер закончился, значение отрицательное, и регистрация не влияет.
Если выполняется вызов метода onAdvance(), то перед возвратом этот метод может ждать его завершения.
Если у этого фазера есть родитель, и у этого фазера нет зарегистрированных участников, этот дочерний фазер также регистрируется его родителем.
Программа для демонстрации использования Phaser >
import java.util.concurrent.Phaser;
public class PhaserTest {
public static void main(String[] args) {
/*Creates a new phaser with 1 registered unArrived parties
* and initial phase number is 0
*/
Phaser phaser=new Phaser(1);
System.out.println("new phaser with 1 registered unArrived parties"
+ " created and initial phase number is 0 ");
//Create 3 threads
Thread thread1=new Thread(new MyRunnable(phaser,"first"),"Thread-1");
Thread thread2=new Thread(new MyRunnable(phaser,"second"),"Thread-2");
Thread thread3=new Thread(new MyRunnable(phaser,"third"),"Thread-3");
System.out.println("\n--------Phaser has started---------------");
//Start 3 threads
thread1.start();
thread2.start();
thread3.start();
//get current phase
int currentPhase=phaser.getPhase();
/*arriveAndAwaitAdvance() will cause thread to wait until current phase
* has been completed i.e. until all registered threads
* call arriveAndAwaitAdvance()
*/
phaser.arriveAndAwaitAdvance();
System.out.println("------Phase-"+currentPhase+" has been COMPLETED----------");
//------NEXT PHASE BEGINS------
currentPhase=phaser.getPhase();
phaser.arriveAndAwaitAdvance();
System.out.println("------Phase-"+currentPhase+" has been COMPLETED----------");
/* current thread Arrives and deRegisters from phaser.
* DeRegistration reduces the number of parties that may
* be required to advance in future phases.
*/
phaser.arriveAndDeregister();
//check whether phaser has been terminated or not.
if(phaser.isTerminated())
System.out.println("\nPhaser has been terminated");
}
}
class MyRunnable implements Runnable{
Phaser phaser;
MyRunnable(Phaser phaser,String name){
this.phaser=phaser;
this.phaser.register(); //Registers/Add a new unArrived party to this phaser.
System.out.println(name +" - New unarrived party has "
+ "been registered with phaser");
}
@Override
public void run() {
System.out.println(Thread.currentThread().getName() +
" - party has arrived and is working in "
+ "Phase-"+phaser.getPhase());
phaser.arriveAndAwaitAdvance();
//Sleep has been used for formatting output
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
//------NEXT PHASE BEGINS------
System.out.println(Thread.currentThread().getName() +
" - party has arrived and is working in "
+ "Phase-"+phaser.getPhase());
phaser.arriveAndAwaitAdvance();
phaser.arriveAndDeregister();
}
}
bulkRegister -
Добавляет к этому фазеру количество новых неприбыльных сторон. Он возвращает
- номер фазы прибытия, к которой применяется эта регистрация.
- Если фазер закончился, значение отрицательное, и регистрация не влияет.
Если выполняется вызов метода onAdvance(), то перед возвратом этот метод может ждать его завершения.
arriveAndDeregister -
Текущая нить (вечеринка) Прибывает и исчезает из фазера. DeRegistration уменьшает количество сторон, которые могут потребоваться в будущем для перехода к следующей фазе.
Если этот фазер имеет родительский, и у этого фазера не было зарегистрированных сторон, этот дочерний фазер также регистрируется с его родителем.
Программа для демонстрации родительского и дочернего Phaser
import java.util.concurrent.Phaser;
public class PhaserParentChildTest {
public static void main(String[] args) {
/*
* Creates a new phaser with no registered unArrived parties.
*/
Phaser parentPhaser = new Phaser();
/*
* Creates a new phaser with the given parent &
* no registered unArrived parties.
*/
Phaser childPhaser = new Phaser(parentPhaser,0);
childPhaser.register();
System.out.println("parentPhaser isTerminated : "+parentPhaser.isTerminated());
System.out.println("childPhaser isTerminated : "+childPhaser.isTerminated());
childPhaser.arriveAndDeregister();
System.out.println("\n--childPhaser has called arriveAndDeregister()-- \n");
System.out.println("parentPhaser isTerminated : "+parentPhaser.isTerminated());
System.out.println("childPhaser isTerminated : "+childPhaser.isTerminated());
}
}
Подробнее о Phaser на http://www.javamadesoeasy.com/2015/03/phaser-in-java_21.html