Каков самый простой способ распараллеливать задачу в java?
Скажем, у меня есть такая задача:
for(Object object: objects) {
Result result = compute(object);
list.add(result);
}
Какой самый простой способ распараллелить каждый compute() (если они уже распараллеливаются)?
Мне не нужен ответ, который строго соответствует приведенному выше коду, просто общий ответ. Но если вам нужна дополнительная информация: мои задачи связаны с вводом-выводом, и это для веб-приложения Spring, и задачи будут выполняться в HTTP-запросе.
Ответы
Ответ 1
Я бы порекомендовал взглянуть на ExecutorService.
В частности, что-то вроде этого:
ExecutorService EXEC = Executors.newCachedThreadPool();
List<Callable<Result>> tasks = new ArrayList<Callable<Result>>();
for (final Object object: objects) {
Callable<Result> c = new Callable<Result>() {
@Override
public Result call() throws Exception {
return compute(object);
}
};
tasks.add(c);
}
List<Future<Result>> results = EXEC.invokeAll(tasks);
Обратите внимание, что использование newCachedThreadPool
может быть плохим, если objects
- большой список. Пул кэшированных потоков может создать поток на одну задачу! Вы можете использовать newFixedThreadPool(n)
, где n - это что-то разумное (например, количество ядер, которое у вас есть, при условии, что compute()
связано с ЦП).
Здесь полный код, который фактически запускается:
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
public class ExecutorServiceExample {
private static final Random PRNG = new Random();
private static class Result {
private final int wait;
public Result(int code) {
this.wait = code;
}
}
public static Result compute(Object obj) throws InterruptedException {
int wait = PRNG.nextInt(3000);
Thread.sleep(wait);
return new Result(wait);
}
public static void main(String[] args) throws InterruptedException,
ExecutionException {
List<Object> objects = new ArrayList<Object>();
for (int i = 0; i < 100; i++) {
objects.add(new Object());
}
List<Callable<Result>> tasks = new ArrayList<Callable<Result>>();
for (final Object object : objects) {
Callable<Result> c = new Callable<Result>() {
@Override
public Result call() throws Exception {
return compute(object);
}
};
tasks.add(c);
}
ExecutorService exec = Executors.newCachedThreadPool();
// some other exectuors you could try to see the different behaviours
// ExecutorService exec = Executors.newFixedThreadPool(3);
// ExecutorService exec = Executors.newSingleThreadExecutor();
try {
long start = System.currentTimeMillis();
List<Future<Result>> results = exec.invokeAll(tasks);
int sum = 0;
for (Future<Result> fr : results) {
sum += fr.get().wait;
System.out.println(String.format("Task waited %d ms",
fr.get().wait));
}
long elapsed = System.currentTimeMillis() - start;
System.out.println(String.format("Elapsed time: %d ms", elapsed));
System.out.println(String.format("... but compute tasks waited for total of %d ms; speed-up of %.2fx", sum, sum / (elapsed * 1d)));
} finally {
exec.shutdown();
}
}
}
Ответ 2
Для получения более подробного ответа прочитайте Java Concurrency на практике и используйте java.util.concurrent.
Ответ 3
Здесь я использую в своих проектах:
public class ParallelTasks
{
private final Collection<Runnable> tasks = new ArrayList<Runnable>();
public ParallelTasks()
{
}
public void add(final Runnable task)
{
tasks.add(task);
}
public void go() throws InterruptedException
{
final ExecutorService threads = Executors.newFixedThreadPool(Runtime.getRuntime()
.availableProcessors());
try
{
final CountDownLatch latch = new CountDownLatch(tasks.size());
for (final Runnable task : tasks)
threads.execute(new Runnable() {
public void run()
{
try
{
task.run();
}
finally
{
latch.countDown();
}
}
});
latch.await();
}
finally
{
threads.shutdown();
}
}
}
// ...
public static void main(final String[] args) throws Exception
{
ParallelTasks tasks = new ParallelTasks();
final Runnable waitOneSecond = new Runnable() {
public void run()
{
try
{
Thread.sleep(1000);
}
catch (InterruptedException e)
{
}
}
};
tasks.add(waitOneSecond);
tasks.add(waitOneSecond);
tasks.add(waitOneSecond);
tasks.add(waitOneSecond);
final long start = System.currentTimeMillis();
tasks.go();
System.err.println(System.currentTimeMillis() - start);
}
Что печатает чуть больше 2000 на моем двухъядерном ящике.
Ответ 4
Вы можете использовать ThreadPoolExecutor. Вот пример кода: http://programmingexamples.wikidot.com/threadpoolexecutor (слишком долго, чтобы привести его сюда)
Ответ 5
Можно просто создать несколько потоков и получить результат.
Thread t = new Mythread(object);
if (t.done()) {
// get result
// add result
}
EDIT: Я думаю, что другие решения более прохладные.
Ответ 6
Я собирался упомянуть класс исполнителя. Вот пример кода, который вы бы разместили в классе исполнителя.
private static ExecutorService threadLauncher = Executors.newFixedThreadPool(4);
private List<Callable<Object>> callableList = new ArrayList<Callable<Object>>();
public void addCallable(Callable<Object> callable) {
this.callableList.add(callable);
}
public void clearCallables(){
this.callableList.clear();
}
public void executeThreads(){
try {
threadLauncher.invokeAll(this.callableList);
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
public Object[] getResult() {
List<Future<Object>> resultList = null;
Object[] resultArray = null;
try {
resultList = threadLauncher.invokeAll(this.callableList);
resultArray = new Object[resultList.size()];
for (int i = 0; i < resultList.size(); i++) {
resultArray[i] = resultList.get(i).get();
}
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
return resultArray;
}
Затем, чтобы использовать его, вы должны вызвать вызовы в класс исполнителя, чтобы заполнить и выполнить его.
executor.addCallable( some implementation of callable) // do this once for each task
Object[] results = executor.getResult();
Ответ 7
Параллельный массив Fork/Join - это один из вариантов
Ответ 8
В Java8 и более поздних версиях вы можете создать поток, а затем выполнить обработку параллельно с parallelStream:
List<T> objects = ...;
List<Result> result = objects.parallelStream().map(object -> {
return compute(object);
}).collect(Collectors.toList());
Примечание. Порядок результатов может не соответствовать порядку объектов в списке.
Подробная информация о том, как настроить правильное количество потоков, доступна в этом вопросе о том, сколько потоков создано в параллельном потоке в java-8.