Исполнители Java: как быть уведомленным, без блокировки, когда задача завершается?
Скажем, у меня есть очередь, полная задач, которые мне нужно отправить в службу исполнителя. Я хочу, чтобы они обрабатывались по одному. Самый простой способ, о котором я могу думать, это:
- Возьмите задачу из очереди
- Отправить его исполнителю
- Вызовите .get в возвращаемом будущем и заблокируйте, пока не будет доступен результат.
- Возьмите еще одну задачу из очереди...
Однако я стараюсь полностью не блокировать. Если у меня 10 000 таких очередей, для которых их задачи обрабатываются по одному, у меня закончится пространство стека, потому что большинство из них будет удерживать блокированные потоки.
Я хотел бы отправить задание и предоставить обратный вызов, который вызывается, когда задача завершена. Я буду использовать это уведомление обратного вызова в качестве флага для отправки следующей задачи. (Функциональные и реактивные джойстики, по-видимому, используют такие неблокирующие алгоритмы, но я не могу понять их код)
Как я могу это сделать с помощью JDK java.util.concurrent, за исключением написания моей собственной службы-исполнителя?
(очередь, которая передает мне эти задачи, может сама блокироваться, но это проблема, которая будет решена позже)
Ответы
Ответ 1
Определите интерфейс обратного вызова для получения любых параметров, которые вы хотите передать в уведомлении о завершении. Затем вызовите его в конце задачи.
Вы даже можете написать общую оболочку для задач Runnable и отправить их в ExecutorService
. Или, см. Ниже механизм, встроенный в Java 8.
class CallbackTask implements Runnable {
private final Runnable task;
private final Callback callback;
CallbackTask(Runnable task, Callback callback) {
this.task = task;
this.callback = callback;
}
public void run() {
task.run();
callback.complete();
}
}
С CompletableFuture
Java 8 включал более сложные средства для компоновки конвейеров, где процессы могут выполняться асинхронно и условно. Здесь надуманный, но полный пример уведомления.
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
public class GetTaskNotificationWithoutBlocking {
public static void main(String... argv) throws Exception {
ExampleService svc = new ExampleService();
GetTaskNotificationWithoutBlocking listener = new GetTaskNotificationWithoutBlocking();
CompletableFuture<String> f = CompletableFuture.supplyAsync(svc::work);
f.thenAccept(listener::notify);
System.out.println("Exiting main()");
}
void notify(String msg) {
System.out.println("Received message: " + msg);
}
}
class ExampleService {
String work() {
sleep(7000, TimeUnit.MILLISECONDS); /* Pretend to be busy... */
char[] str = new char[5];
ThreadLocalRandom current = ThreadLocalRandom.current();
for (int idx = 0; idx < str.length; ++idx)
str[idx] = (char) ('A' + current.nextInt(26));
String msg = new String(str);
System.out.println("Generated message: " + msg);
return msg;
}
public static void sleep(long average, TimeUnit unit) {
String name = Thread.currentThread().getName();
long timeout = Math.min(exponential(average), Math.multiplyExact(10, average));
System.out.printf("%s sleeping %d %s...%n", name, timeout, unit);
try {
unit.sleep(timeout);
System.out.println(name + " awoke.");
} catch (InterruptedException abort) {
Thread.currentThread().interrupt();
System.out.println(name + " interrupted.");
}
}
public static long exponential(long avg) {
return (long) (avg * -Math.log(1 - ThreadLocalRandom.current().nextDouble()));
}
}
Ответ 2
Используйте API будущего API, поддерживающий Guava, и добавьте обратный вызов. Ср с веб-сайта:
ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(10));
ListenableFuture<Explosion> explosion = service.submit(new Callable<Explosion>() {
public Explosion call() {
return pushBigRedButton();
}
});
Futures.addCallback(explosion, new FutureCallback<Explosion>() {
// we want this handler to run immediately after we push the big red button!
public void onSuccess(Explosion explosion) {
walkAwayFrom(explosion);
}
public void onFailure(Throwable thrown) {
battleArchNemesis(); // escaped the explosion!
}
});
Ответ 3
В Java 8 вы можете использовать CompletableFuture. Вот пример, который у меня был в моем коде, где я использую его для извлечения пользователей из моей пользовательской службы, сопоставления их с моими объектами представления, а затем обновления моего представления или отображения диалогового окна с ошибкой (это приложение графического интерфейса):
CompletableFuture.supplyAsync(
userService::listUsers
).thenApply(
this::mapUsersToUserViews
).thenAccept(
this::updateView
).exceptionally(
throwable -> { showErrorDialogFor(throwable); return null; }
);
Выполняется асинхронно. Я использую два частных метода: mapUsersToUserViews
и updateView
.
Ответ 4
Вы можете расширить класс FutureTask
и переопределить метод done()
, а затем добавить объект FutureTask
в ExecutorService
, поэтому метод done()
будет вызываться, когда FutureTask
будет выполнен немедленно.
Ответ 5
ThreadPoolExecutor
также имеет методы beforeExecute
и afterExecute
hook, которые вы можете переопределить и использовать. Вот описание из ThreadPoolExecutor
Javadocs.
Методы крюка
Этот класс обеспечивает защищенный переопределяемый beforeExecute(java.lang.Thread, java.lang.Runnable)
и afterExecute(java.lang.Runnable, java.lang.Throwable)
, которые вызывают до и после выполнения каждой задачи. Они могут использоваться для управления средой исполнения; например, повторная инициализация ThreadLocals
, сбор статистики или добавление записей журнала. Кроме того, метод terminated()
может быть переопределен для выполнения любой специальной обработки, которая должна быть выполнена после полного завершения Executor
. Если методы перехвата или обратного вызова генерируют исключения, внутренние рабочие потоки могут, в свою очередь, терпеть неудачу и внезапно завершаться.
Ответ 6
Используйте CountDownLatch
.
Это от java.util.concurrent
, и это именно так, чтобы дождаться завершения нескольких потоков, прежде чем продолжить.
Для достижения эффекта обратного вызова, который вам нужен, это требует дополнительной дополнительной дополнительной работы. А именно, обрабатывая это самостоятельно в отдельном потоке, который использует CountDownLatch
и ждет на нем, а затем продолжает уведомлять о том, что вам нужно сообщить. Не существует встроенной поддержки обратного вызова или чего-либо подобного этому эффекту.
ИЗМЕНИТЬ: Теперь, когда я еще больше понимаю ваш вопрос, я думаю, что вы слишком далеко, без необходимости. Если вы возьмете обычный SingleThreadExecutor
, задайте ему все задачи, и он будет выполнять очередь изначально.
Ответ 7
Если вы хотите, чтобы задачи одновременно не выполнялись, используйте SingleThreadedExecutor. Задачи будут обрабатываться в том порядке, в котором они представлены. Вам даже не нужно выполнять задачи, просто отправьте их в exec.
Ответ 8
Просто добавьте ответ Мэтта, который помог, вот более примерный пример, показывающий использование обратного вызова.
private static Primes primes = new Primes();
public static void main(String[] args) throws InterruptedException {
getPrimeAsync((p) ->
System.out.println("onPrimeListener; p=" + p));
System.out.println("Adios mi amigito");
}
public interface OnPrimeListener {
void onPrime(int prime);
}
public static void getPrimeAsync(OnPrimeListener listener) {
CompletableFuture.supplyAsync(primes::getNextPrime)
.thenApply((prime) -> {
System.out.println("getPrimeAsync(); prime=" + prime);
if (listener != null) {
listener.onPrime(prime);
}
return prime;
});
}
Вывод:
getPrimeAsync(); prime=241
onPrimeListener; p=241
Adios mi amigito
Ответ 9
Вы можете использовать реализацию Callable, чтобы
public class MyAsyncCallable<V> implements Callable<V> {
CallbackInterface ci;
public MyAsyncCallable(CallbackInterface ci) {
this.ci = ci;
}
public V call() throws Exception {
System.out.println("Call of MyCallable invoked");
System.out.println("Result = " + this.ci.doSomething(10, 20));
return (V) "Good job";
}
}
где CallbackInterface - это нечто очень основное, например
public interface CallbackInterface {
public int doSomething(int a, int b);
}
и теперь основной класс будет выглядеть следующим образом:
ExecutorService ex = Executors.newFixedThreadPool(2);
MyAsyncCallable<String> mac = new MyAsyncCallable<String>((a, b) -> a + b);
ex.submit(mac);
Ответ 10
Это расширение для ответа Pache с использованием Guava ListenableFuture
.
В частности, Futures.transform()
возвращает ListenableFuture
, поэтому его можно использовать для цепочки асинхронных вызовов. Futures.addCallback()
возвращает void
, поэтому не может использоваться для цепочки, но хорош для обработки успеха/сбоя при завершении асинхронизации.
// ListenableFuture1: Open Database
ListenableFuture<Database> database = service.submit(new Callable<Database>() {
public Database call() {
// Let assume this call is async, i.e. returns a ListenableFuture<Database>
return openDatabase();
}
});
// ListenableFuture2: Query Database for Cursor rows
cursor = Futures.transform(database, new AsyncFunction<Database, Cursor>() {
@Override public ListenableFuture<Cursor> apply(Database database) {
// Let assume this call is async, i.e. returns a ListenableFuture<Cursor>
return database.query(table, columns, selection, args);
}
});
// ListenableFuture3: Convert Cursor rows to List<FooObject>
fooList = Futures.transform(cursor, new Function<Cursor, List<FooObject>>() {
@Override public List<FooObject> apply(Cursor cursor) {
// Let assume this call is synchronous, i.e. directly returns List<FooObject>
return cursorToFooList(cursor);
}
});
// Final Callback: Handle the success/errors when final future completes
Futures.addCallback(fooList, new FutureCallback<List<FooObject>>() {
public void onSuccess(List<FooObject> fooObjects) {
doSomethingWith(fooObjects);
}
public void onFailure(Throwable thrown) {
log.error(thrown);
}
});
ПРИМЕЧАНИЕ. В дополнение к цепочке асинхронных задач Futures.transform()
также позволяет планировать каждую задачу на отдельном исполнителе (не показано в этом примере).
Ответ 11
Простой код для реализации механизма Callback
с использованием ExecutorService
import java.util.concurrent.*;
import java.util.*;
public class CallBackDemo{
public CallBackDemo(){
System.out.println("creating service");
ExecutorService service = Executors.newFixedThreadPool(5);
try{
for ( int i=0; i<5; i++){
Callback callback = new Callback(i+1);
MyCallable myCallable = new MyCallable((long)i+1,callback);
Future<Long> future = service.submit(myCallable);
//System.out.println("future status:"+future.get()+":"+future.isDone());
}
}catch(Exception err){
err.printStackTrace();
}
service.shutdown();
}
public static void main(String args[]){
CallBackDemo demo = new CallBackDemo();
}
}
class MyCallable implements Callable<Long>{
Long id = 0L;
Callback callback;
public MyCallable(Long val,Callback obj){
this.id = val;
this.callback = obj;
}
public Long call(){
//Add your business logic
System.out.println("Callable:"+id+":"+Thread.currentThread().getName());
callback.callbackMethod();
return id;
}
}
class Callback {
private int i;
public Callback(int i){
this.i = i;
}
public void callbackMethod(){
System.out.println("Call back:"+i);
// Add your business logic
}
}
выход:
creating service
Callable:1:pool-1-thread-1
Call back:1
Callable:3:pool-1-thread-3
Callable:2:pool-1-thread-2
Call back:2
Callable:5:pool-1-thread-5
Call back:5
Call back:3
Callable:4:pool-1-thread-4
Call back:4
Ключевые примечания:
- Если вы хотите последовательно выполнять задачи процесса в порядке FIFO, замените
newFixedThreadPool(5)
на newFixedThreadPool(1)
-
Если вы хотите обработать следующую задачу после анализа результата из Callback
предыдущей задачи, просто оставьте комментарий ниже строки
//System.out.println("future status:"+future.get()+":"+future.isDone());
-
Вы можете заменить newFixedThreadPool()
на один из
Executors.newCachedThreadPool()
Executors.newWorkStealingPool()
ThreadPoolExecutor
в зависимости от вашего варианта использования.
-
Если вы хотите обработать метод обратного вызова асинхронно
а. Передайте общую задачу ExecutorService or ThreadPoolExecutor
to Callable
б. Преобразуйте метод Callable
в Callable/Runnable
задачу
с. Задайте задачу обратного вызова на ExecutorService or ThreadPoolExecutor