Как дождаться завершения ряда потоков?
Каков способ просто дождаться завершения всего процесса с резьбой? Например, допустим, что у меня есть:
public class DoSomethingInAThread implements Runnable{
public static void main(String[] args) {
for (int n=0; n<1000; n++) {
Thread t = new Thread(new DoSomethingInAThread());
t.start();
}
// wait for all threads' run() methods to complete before continuing
}
public void run() {
// do something here
}
}
Как мне изменить это, чтобы метод main()
останавливался в комментарии до тех пор, пока не исчезнут все методы run()
нитей? Спасибо!
Ответы
Ответ 1
Вы помещаете все потоки в массив, запускаете их все, а затем имеете петлю
for(i = 0; i < threads.length; i++)
threads[i].join();
Каждое соединение будет блокироваться до завершения соответствующего потока. Потоки могут выполняться в другом порядке, чем вы их присоединяете, но это не проблема: когда цикл выходит, все потоки завершены.
Ответ 2
Один из способов - создать List
из Thread
s, создать и запустить каждый поток, добавив его в список. Как только все будет запущено, пройдите по списку и вызовите join()
на каждом из них. Неважно, в каком порядке заканчиваются потоки, все, что вам нужно знать, - это то, что к тому времени, когда второй цикл завершит выполнение, каждый поток будет завершен.
Лучший подход - использовать ExecutorService и связанные с ним методы:
List<Callable> callables = ... // assemble list of Callables here
// Like Runnable but can return a value
ExecutorService execSvc = Executors.newCachedThreadPool();
List<Future<?>> results = execSvc.invokeAll(callables);
// Note: You may not care about the return values, in which case don't
// bother saving them
Использование ExecutorService (и все новые вещи из Java 5 concurrency утилиты) невероятно гибки, и приведенный выше пример едва царапины поверхности.
Ответ 3
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
public class DoSomethingInAThread implements Runnable
{
public static void main(String[] args) throws ExecutionException, InterruptedException
{
//limit the number of actual threads
int poolSize = 10;
ExecutorService service = Executors.newFixedThreadPool(poolSize);
List<Future<Runnable>> futures = new ArrayList<Future<Runnable>>();
for (int n = 0; n < 1000; n++)
{
Future f = service.submit(new DoSomethingInAThread());
futures.add(f);
}
// wait for all tasks to complete before continuing
for (Future<Runnable> f : futures)
{
f.get();
}
//shut down the executor service so that this thread can exit
service.shutdownNow();
}
public void run()
{
// do something here
}
}
Ответ 4
Избегайте класса Thread в целом и вместо этого используйте более высокие абстракции, предоставляемые в java.util.concurrent
Класс ExecutorService предоставляет метод invokeAll, который, похоже, делает именно то, что вы хотите.
Ответ 5
вместо join()
, который является старым API, вы можете использовать CountDownLatch. Я изменил ваш код, как показано ниже, чтобы выполнить ваши требования.
import java.util.concurrent.*;
class DoSomethingInAThread implements Runnable{
CountDownLatch latch;
public DoSomethingInAThread(CountDownLatch latch){
this.latch = latch;
}
public void run() {
try{
System.out.println("Do some thing");
latch.countDown();
}catch(Exception err){
err.printStackTrace();
}
}
}
public class CountDownLatchDemo {
public static void main(String[] args) {
try{
CountDownLatch latch = new CountDownLatch(1000);
for (int n=0; n<1000; n++) {
Thread t = new Thread(new DoSomethingInAThread(latch));
t.start();
}
latch.await();
System.out.println("In Main thread after completion of 1000 threads");
}catch(Exception err){
err.printStackTrace();
}
}
}
Объяснение
-
CountDownLatch
был инициализирован с заданным числом 1000 согласно вашему требованию.
-
Каждый рабочий поток DoSomethingInAThread
будет декрементировать CountDownLatch
, который был передан в конструкторе.
-
Основной поток CountDownLatchDemo
await()
до тех пор, пока счетчик не станет равным нулю. После того, как счетчик станет нулевым, вы получите ниже строки на выходе.
In Main thread after completion of 1000 threads
Дополнительная информация с страницы документации oracle
public void await()
throws InterruptedException
Заставляет текущий поток ждать, пока защелка не спустится до нуля, если поток не прерван.
Обратитесь к соответствующему запросу SE для других параметров:
дождитесь, пока все потоки закончат работу в java
Ответ 6
Как предположил Мартин К, java.util.concurrent.CountDownLatch
представляется лучшим решением для этого. Просто добавив пример для того же
public class CountDownLatchDemo
{
public static void main (String[] args)
{
int noOfThreads = 5;
// Declare the count down latch based on the number of threads you need
// to wait on
final CountDownLatch executionCompleted = new CountDownLatch(noOfThreads);
for (int i = 0; i < noOfThreads; i++)
{
new Thread()
{
@Override
public void run ()
{
System.out.println("I am executed by :" + Thread.currentThread().getName());
try
{
// Dummy sleep
Thread.sleep(3000);
// One thread has completed its job
executionCompleted.countDown();
}
catch (InterruptedException e)
{
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}.start();
}
try
{
// Wait till the count down latch opens.In the given case till five
// times countDown method is invoked
executionCompleted.await();
System.out.println("All over");
}
catch (InterruptedException e)
{
e.printStackTrace();
}
}
}
Ответ 7
Рассмотрим использование java.util.concurrent.CountDownLatch
. Примеры в javadocs
Ответ 8
Если вы создадите список потоков, вы можете прокручивать их и .join() против каждого, и ваш цикл будет завершен, когда все потоки будут. Я не пробовал, хотя.
http://docs.oracle.com/javase/8/docs/api/java/lang/Thread.html#join()
Ответ 9
В зависимости от ваших потребностей вы также можете проверить классы CountDownLatch и CyclicBarrier в пакете java.util.concurrent. Они могут быть полезны, если вы хотите, чтобы ваши потоки подождали друг друга или вам нужен более тонкий контроль над тем, как выполняются ваши потоки (например, ожидание во внутреннем исполнении для другого потока для установки некоторого состояния). Вы также можете использовать CountDownLatch для одновременного запуска всех ваших потоков, а не запускать их один за другим, когда вы повторяете цикл. В стандартном документе API есть пример этого, плюс использование другого CountDownLatch для ожидания завершения всех потоков.
Ответ 10
Это будет комментарий, но я пока не могу комментировать комментарии.
Мартин K. Мне интересно, как вы будете использовать ThreadGroup
. Вы делали это раньше?
Я вижу, что выше, вы предлагаете проверить activeCount
- отключение Мартин v Löwis озабоченность в отношении опроса на данный момент, я есть еще одна проблема с activeCount
.
Предостережение: я не пробовал использовать это, поэтому я не эксперт по этому вопросу, но, согласно javadocs, он возвращает оценку количества активных потоков.
Лично мне было бы жалко пытаться построить систему с оценкой. У вас есть еще одна мысль о том, как это сделать, или я неправильно читаю javadoc?
Ответ 11
Создайте объект потока внутри первого цикла for.
for (int i = 0; i < threads.length; i++) {
threads[i] = new Thread(new Runnable() {
public void run() {
// some code to run in parallel
}
});
threads[i].start();
}
И тогда так все говорят здесь.
for(i = 0; i < threads.length; i++)
threads[i].join();
Ответ 12
Вы можете сделать это с помощью Object "ThreadGroup" и его параметра activeCount:
Ответ 13
В качестве альтернативы CountDownLatch вы также можете использовать CyclicBarrier, например.
public class ThreadWaitEx {
static CyclicBarrier barrier = new CyclicBarrier(100, new Runnable(){
public void run(){
System.out.println("clean up job after all tasks are done.");
}
});
public static void main(String[] args) {
for (int i = 0; i < 100; i++) {
Thread t = new Thread(new MyCallable(barrier));
t.start();
}
}
}
class MyCallable implements Runnable{
private CyclicBarrier b = null;
public MyCallable(CyclicBarrier b){
this.b = b;
}
@Override
public void run(){
try {
//do something
System.out.println(Thread.currentThread().getName()+" is waiting for barrier after completing his job.");
b.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}
}
Чтобы использовать CyclicBarrier в этом случае, bar.await() должен быть последним утверждением, т.е. когда ваш поток выполняется с его работой. CyclicBarrier можно использовать снова с помощью метода reset(). Чтобы процитировать javadocs:
A CyclicBarrier поддерживает необязательную команду Runnable, которая запускается один раз за барьерную точку, после того, как последний поток в партии прибывает, но до того, как будут выпущены какие-либо потоки. Это действие барьера полезно для обновления состояния shared-state до того, как какая-либо из сторон продолжит работу.
Ответ 14
Объединение join()
не помогло мне. посмотрите этот образец в Kotlin:
val timeInMillis = System.currentTimeMillis()
ThreadUtils.startNewThread(Runnable {
for (i in 1..5) {
val t = Thread(Runnable {
Thread.sleep(50)
var a = i
kotlin.io.println(Thread.currentThread().name + "|" + "a=$a")
Thread.sleep(200)
for (j in 1..5) {
a *= j
Thread.sleep(100)
kotlin.io.println(Thread.currentThread().name + "|" + "$a*$j=$a")
}
kotlin.io.println(Thread.currentThread().name + "|TaskDurationInMillis = " + (System.currentTimeMillis() - timeInMillis))
})
t.start()
}
})
Результат:
Thread-5|a=5
Thread-1|a=1
Thread-3|a=3
Thread-2|a=2
Thread-4|a=4
Thread-2|2*1=2
Thread-3|3*1=3
Thread-1|1*1=1
Thread-5|5*1=5
Thread-4|4*1=4
Thread-1|2*2=2
Thread-5|10*2=10
Thread-3|6*2=6
Thread-4|8*2=8
Thread-2|4*2=4
Thread-3|18*3=18
Thread-1|6*3=6
Thread-5|30*3=30
Thread-2|12*3=12
Thread-4|24*3=24
Thread-4|96*4=96
Thread-2|48*4=48
Thread-5|120*4=120
Thread-1|24*4=24
Thread-3|72*4=72
Thread-5|600*5=600
Thread-4|480*5=480
Thread-3|360*5=360
Thread-1|120*5=120
Thread-2|240*5=240
Thread-1|TaskDurationInMillis = 765
Thread-3|TaskDurationInMillis = 765
Thread-4|TaskDurationInMillis = 765
Thread-5|TaskDurationInMillis = 765
Thread-2|TaskDurationInMillis = 765
Теперь позвольте мне использовать join()
для потоков:
val timeInMillis = System.currentTimeMillis()
ThreadUtils.startNewThread(Runnable {
for (i in 1..5) {
val t = Thread(Runnable {
Thread.sleep(50)
var a = i
kotlin.io.println(Thread.currentThread().name + "|" + "a=$a")
Thread.sleep(200)
for (j in 1..5) {
a *= j
Thread.sleep(100)
kotlin.io.println(Thread.currentThread().name + "|" + "$a*$j=$a")
}
kotlin.io.println(Thread.currentThread().name + "|TaskDurationInMillis = " + (System.currentTimeMillis() - timeInMillis))
})
t.start()
t.join()
}
})
И результат:
Thread-1|a=1
Thread-1|1*1=1
Thread-1|2*2=2
Thread-1|6*3=6
Thread-1|24*4=24
Thread-1|120*5=120
Thread-1|TaskDurationInMillis = 815
Thread-2|a=2
Thread-2|2*1=2
Thread-2|4*2=4
Thread-2|12*3=12
Thread-2|48*4=48
Thread-2|240*5=240
Thread-2|TaskDurationInMillis = 1568
Thread-3|a=3
Thread-3|3*1=3
Thread-3|6*2=6
Thread-3|18*3=18
Thread-3|72*4=72
Thread-3|360*5=360
Thread-3|TaskDurationInMillis = 2323
Thread-4|a=4
Thread-4|4*1=4
Thread-4|8*2=8
Thread-4|24*3=24
Thread-4|96*4=96
Thread-4|480*5=480
Thread-4|TaskDurationInMillis = 3078
Thread-5|a=5
Thread-5|5*1=5
Thread-5|10*2=10
Thread-5|30*3=30
Thread-5|120*4=120
Thread-5|600*5=600
Thread-5|TaskDurationInMillis = 3833
Как понятно, когда мы используем join
:
- Потоки работают последовательно.
- Первый образец занимает 765 миллисекунд, а второй - 3833 миллисекунды.
Нашим решением для предотвращения блокировки других потоков было создание ArrayList:
val threads = ArrayList<Thread>()
Теперь, когда мы хотим начать новый поток, мы чаще всего добавляем его в ArrayList:
addThreadToArray(
ThreadUtils.startNewThread(Runnable {
...
})
)
Функция addThreadToArray
:
@Synchronized
fun addThreadToArray(th: Thread) {
threads.add(th)
}
startNewThread
funstion:
fun startNewThread(runnable: Runnable) : Thread {
val th = Thread(runnable)
th.isDaemon = false
th.priority = Thread.MAX_PRIORITY
th.start()
return th
}
Проверьте завершение потоков, как показано ниже, везде, где это необходимо:
val notAliveThreads = ArrayList<Thread>()
for (t in threads)
if (!t.isAlive)
notAliveThreads.add(t)
threads.removeAll(notAliveThreads)
if (threads.size == 0){
// The size is 0 -> there is no alive threads.
}