Существует ли ExecutorService, который использует текущий поток?
То, что мне нужно, - это совместимый способ настройки использования пула потоков или нет. В идеале остальная часть кода не должна затрагиваться вообще. Я мог бы использовать пул потоков с 1 потоком, но это не совсем то, что я хочу. Любые идеи?
ExecutorService es = threads == 0 ? new CurrentThreadExecutor() : Executors.newThreadPoolExecutor(threads);
// es.execute / es.submit / new ExecutorCompletionService(es) etc
Ответы
Ответ 1
Здесь действительно простая реализация Executor
(not ExecutorService
, mind you), которая использует только текущий поток. Кража этого из "Java Concurrency на практике" (существенное чтение).
public class CurrentThreadExecutor implements Executor {
public void execute(Runnable r) {
r.run();
}
}
ExecutorService
- более сложный интерфейс, но его можно обрабатывать с использованием того же подхода.
Ответ 2
Вы можете использовать Guava MoreExecutors.newDirectExecutorService()
или MoreExecutors.directExecutor()
если вам не нужен ExecutorService
.
Если включить Guava слишком тяжело, вы можете реализовать что-то почти так же хорошо:
public final class SameThreadExecutorService extends ThreadPoolExecutor {
private final CountDownLatch signal = new CountDownLatch(1);
private SameThreadExecutorService() {
super(1, 1, 0, TimeUnit.DAYS, new SynchronousQueue<Runnable>(),
new ThreadPoolExecutor.CallerRunsPolicy());
}
@Override public void shutdown() {
super.shutdown();
signal.countDown();
}
public static ExecutorService getInstance() {
return SingletonHolder.instance;
}
private static class SingletonHolder {
static ExecutorService instance = createInstance();
}
private static ExecutorService createInstance() {
final SameThreadExecutorService instance
= new SameThreadExecutorService();
// The executor has one worker thread. Give it a Runnable that waits
// until the executor service is shut down.
// All other submitted tasks will use the RejectedExecutionHandler
// which runs tasks using the caller thread.
instance.submit(new Runnable() {
@Override public void run() {
boolean interrupted = false;
try {
while (true) {
try {
instance.signal.await();
break;
} catch (InterruptedException e) {
interrupted = true;
}
}
} finally {
if (interrupted) {
Thread.currentThread().interrupt();
}
}
}});
return Executors.unconfigurableScheduledExecutorService(instance);
}
}
Ответ 3
Стиль Java 8:
Executor e = Runnable::run;
Ответ 4
Я написал ExecutorService
основанный на AbstractExecutorService
.
/**
* Executes all submitted tasks directly in the same thread as the caller.
*/
public class SameThreadExecutorService extends AbstractExecutorService {
//volatile because can be viewed by other threads
private volatile boolean terminated;
@Override
public void shutdown() {
terminated = true;
}
@Override
public boolean isShutdown() {
return terminated;
}
@Override
public boolean isTerminated() {
return terminated;
}
@Override
public boolean awaitTermination(long theTimeout, TimeUnit theUnit) throws InterruptedException {
shutdown(); // TODO ok to call shutdown? what if the client never called shutdown???
return terminated;
}
@Override
public List<Runnable> shutdownNow() {
return Collections.emptyList();
}
@Override
public void execute(Runnable theCommand) {
theCommand.run();
}
}
Ответ 5
Вы можете использовать RejectedExecutionHandler для запуска задачи в текущем потоке.
public static final ThreadPoolExecutor CURRENT_THREAD_EXECUTOR = new ThreadPoolExecutor(0, 0, 0, TimeUnit.DAYS, new SynchronousQueue<Runnable>(), new RejectedExecutionHandler() {
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
r.run();
}
});
Вам нужен только один из них.
Ответ 6
Мне пришлось использовать тот же "CurrentThreadExecutorService" для тестирования, и хотя все предлагаемые решения были хорошими (особенно упоминание пути Guava), я подошел с чем-то похожим на то, что Питер Лори предложил здесь.
Как уже упоминалось Axelle Ziegler here, к сожалению, решение Peter на самом деле не работает из-за проверки, введенной в ThreadPoolExecutor
в параметре конструктора maximumPoolSize
(т.е. maximumPoolSize
не может быть <=0
).
Чтобы обойти это, я сделал следующее:
private static ExecutorService currentThreadExecutorService() {
CallerRunsPolicy callerRunsPolicy = new ThreadPoolExecutor.CallerRunsPolicy();
return new ThreadPoolExecutor(0, 1, 0L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), callerRunsPolicy) {
@Override
public void execute(Runnable command) {
callerRunsPolicy.rejectedExecution(command, this);
}
};
}