Как реализовать простую резьбу с фиксированным числом рабочих потоков
Я ищу простейший, самый простой способ реализовать следующее:
- Основная программа создает экземпляр рабочего
потоки для выполнения задачи.
- Только задачи
n
могут запускаться сразу.
- Когда
n
достигнуто, больше нет рабочих
начинаются до тех пор, пока
текущие потоки опускаются ниже n
.
Ответы
Ответ 1
Я думаю, что Executors.newFixedThreadPool соответствует вашим требованиям. Существует несколько способов использования полученного ExecutorService в зависимости от того, хотите ли вы, чтобы результат возвращался к основному потоку или была ли эта задача полностью автономной, и есть ли у вас набор задач для выполнения переднего плана или задаются ли задачи в зависимости от какого-либо события.
Collection<YourTask> tasks = new ArrayList<YourTask>();
YourTask yt1 = new YourTask();
...
tasks.add(yt1);
...
ExecutorService exec = Executors.newFixedThreadPool(5);
List<Future<YourResultType>> results = exec.invokeAll(tasks);
В качестве альтернативы, если у вас есть новая асинхронная задача для выполнения в ответ на какое-либо событие, вы, вероятно, просто захотите использовать простой метод execute(Runnable)
ExecutorService.
Ответ 2
/* Get an executor service that will run a maximum of 5 threads at a time: */
ExecutorService exec = Executors.newFixedThreadPool(5);
/* For all the 100 tasks to be done altogether... */
for (int i = 0; i < 100; i++) {
/* ...execute the task to run concurrently as a runnable: */
exec.execute(new Runnable() {
public void run() {
/* do the work to be done in its own thread */
System.out.println("Running in: " + Thread.currentThread());
}
});
}
/* Tell the executor that after these 100 steps above, we will be done: */
exec.shutdown();
try {
/* The tasks are now running concurrently. We wait until all work is done,
* with a timeout of 50 seconds: */
boolean b = exec.awaitTermination(50, TimeUnit.SECONDS);
/* If the execution timed out, false is returned: */
System.out.println("All done: " + b);
} catch (InterruptedException e) { e.printStackTrace(); }
Ответ 3
Executors.newFixedThreadPool(int)
Executor executor = Executors.newFixedThreadPool(n);
Runnable runnable = new Runnable() {
public void run() {
// do your thing here
}
}
executor.execute(runnable);
Ответ 4
Использовать инфраструктуру Executor; а именно newFixedThreadPool (N)
Ответ 5
-
Если ваша очередь задач не будет неограниченной, а задачи могут выполняться с более короткими временными интервалами, вы можете использовать Executors.newFixedThreadPool(n)
; как предлагает эксперт.
Единственным недостатком этого решения является неограниченный размер очереди задач. У вас нет контроля над этим. Огромная скопище в очереди задач ухудшит производительность приложения и может привести к нехватке памяти в некоторых сценариях.
-
Если вы хотите использовать ExecutorService
и включите механизм work stealing
, где потоки рабочего потока совместно используют рабочую нагрузку из занятых рабочих потоков путем кражи задач в очереди задач. Он вернет тип службы ForkJoinPool.
public static ExecutorService newWorkStealingPool
(int parallelism)
Создает пул потоков, который поддерживает достаточное количество потоков для поддержки заданного уровня parallelism и может использовать несколько очередей для сокращения конкуренции. Уровень parallelism соответствует максимальному количеству потоков, активно вовлеченных в процесс обработки задач или доступных для участия. Фактическое количество потоков может расти и динамически сокращаться. Пул поиска работы не дает никаких гарантий относительно порядка выполнения поставленных задач.
-
Я предпочитаю ThreadPoolExecutor
из-за гибкости API-интерфейсов для управления множеством параметров, который контролирует выполнение задачи потока.
ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
в вашем случае, установите оба corePoolSize and maximumPoolSize as N
. Здесь вы можете контролировать размер очереди задач, определять собственный собственный поток factory и политику обработчика отклонения.
Посмотрите на связанный вопрос SE, чтобы динамически контролировать размер пула:
Динамический пул потоков
Ответ 6
Если вы хотите сделать свой собственный:
private static final int MAX_WORKERS = n;
private List<Worker> workers = new ArrayList<Worker>(MAX_WORKERS);
private boolean roomLeft() {
synchronized (workers) {
return (workers.size() < MAX_WORKERS);
}
}
private void addWorker() {
synchronized (workers) {
workers.add(new Worker(this));
}
}
public void removeWorker(Worker worker) {
synchronized (workers) {
workers.remove(worker);
}
}
public Example() {
while (true) {
if (roomLeft()) {
addWorker();
}
}
}
Где Worker - ваш класс, который расширяет Thread. Каждый рабочий вызовет этот метод removeWorker класса, передав его в качестве параметра, когда он закончит делать это.
С учетом сказанного структура Executor выглядит намного лучше.
Изменить: кто-нибудь хочет объяснить, почему это так плохо, вместо того, чтобы просто понижать его?
Ответ 7
Как упоминалось выше, лучше всего создать пул потоков с Executors класс:
Однако, если вы хотите сворачивать самостоятельно, этот код должен дать вам представление о том, как действовать. В принципе, просто добавьте каждый новый поток в группу потоков и убедитесь, что в группе никогда не было более N активных потоков:
Task[] tasks = getTasks(); // array of tasks to complete
ThreadGroup group = new ThreadGroup();
int i=0;
while( i<tasks.length || group.activeCount()>0 ) {
if( group.activeCount()<N && i<tasks.length ) {
new TaskThread(group, tasks[i]).start();
i++;
} else {
Thread.sleep(100);
}
}