Объединение асинхронного вычисления в синхронное (блокирующее) вычисление
похожие вопросы:
У меня есть объект с методом, который я хотел бы представить клиентам библиотеки (особенно клиентам сценариев) как что-то вроде:
interface MyNiceInterface
{
public Baz doSomethingAndBlock(Foo fooArg, Bar barArg);
public Future<Baz> doSomething(Foo fooArg, Bar barArg);
// doSomethingAndBlock is the straightforward way;
// doSomething has more control but deals with
// a Future and that might be too much hassle for
// scripting clients
}
но примитивный "материал", который у меня есть, представляет собой набор управляемых событиями классов:
interface BazComputationSink
{
public void onBazResult(Baz result);
}
class ImplementingThing
{
public void doSomethingAsync(Foo fooArg, Bar barArg, BazComputationSink sink);
}
где ImplementingThing принимает входные данные, выполняет некоторые непонятные вещи, такие как постановка в очередь задач в очереди задач, а затем, когда возникает результат, sink.onBazResult()
в потоке, который может быть или не быть тем же потоком, что ImplementingThing.doSomethingAsync() назывался.
Есть ли способ, которым я могу использовать функции, управляемые событиями, которые у меня есть, наряду с примитивами параллелизма, для реализации MyNiceInterface, чтобы клиенты сценариев могли счастливо ждать в блокирующем потоке?
редактировать: я могу использовать FutureTask для этого?
Ответы
Ответ 1
Использование вашей собственной будущей реализации:
public class BazComputationFuture implements Future<Baz>, BazComputationSink {
private volatile Baz result = null;
private volatile boolean cancelled = false;
private final CountDownLatch countDownLatch;
public BazComputationFuture() {
countDownLatch = new CountDownLatch(1);
}
@Override
public boolean cancel(final boolean mayInterruptIfRunning) {
if (isDone()) {
return false;
} else {
countDownLatch.countDown();
cancelled = true;
return !isDone();
}
}
@Override
public Baz get() throws InterruptedException, ExecutionException {
countDownLatch.await();
return result;
}
@Override
public Baz get(final long timeout, final TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
countDownLatch.await(timeout, unit);
return result;
}
@Override
public boolean isCancelled() {
return cancelled;
}
@Override
public boolean isDone() {
return countDownLatch.getCount() == 0;
}
public void onBazResult(final Baz result) {
this.result = result;
countDownLatch.countDown();
}
}
public Future<Baz> doSomething(Foo fooArg, Bar barArg) {
BazComputationFuture future = new BazComputationFuture();
doSomethingAsync(fooArg, barArg, future);
return future;
}
public Baz doSomethingAndBlock(Foo fooArg, Bar barArg) {
return doSomething(fooArg, barArg).get();
}
Решение создает встроенный CountDownLatch, который очищается после получения обратного вызова. Если пользователь вызывает get, CountDownLatch используется для блокировки вызывающего потока до завершения вычисления и вызова обратного вызова onBazResult. CountDownLatch гарантирует, что если обратный вызов возникает до того, как get() вызывается, метод get() немедленно вернется с результатом.
Ответ 2
Ну, есть простое решение сделать что-то вроде:
public Baz doSomethingAndBlock(Foo fooArg, Bar barArg) {
final AtomicReference<Baz> notifier = new AtomicReference();
doSomethingAsync(fooArg, barArg, new BazComputationSink() {
public void onBazResult(Baz result) {
synchronized (notifier) {
notifier.set(result);
notifier.notify();
}
}
});
synchronized (notifier) {
while (notifier.get() == null)
notifier.wait();
}
return notifier.get();
}
Конечно, это предполагает, что ваш результат Baz
никогда не будет равен нулю...
Ответ 3
В google guava library есть простой в использовании SettableFuture, который делает эту проблему очень простой (около 10 строк кода).
public class ImplementingThing {
public Baz doSomethingAndBlock(Foo fooArg, Bar barArg) {
try {
return doSomething(fooArg, barArg).get();
} catch (Exception e) {
throw new RuntimeException("Oh dear");
}
};
public Future<Baz> doSomething(Foo fooArg, Bar barArg) {
final SettableFuture<Baz> future = new SettableFuture<Baz>();
doSomethingAsync(fooArg, barArg, new BazComputationSink() {
@Override
public void onBazResult(Baz result) {
future.set(result);
}
});
return future;
};
// Everything below here is just mock stuff to make the example work,
// so you can copy it into your IDE and see it run.
public static class Baz {}
public static class Foo {}
public static class Bar {}
public static interface BazComputationSink {
public void onBazResult(Baz result);
}
public void doSomethingAsync(Foo fooArg, Bar barArg, final BazComputationSink sink) {
new Thread(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(4000);
} catch (InterruptedException e) {
e.printStackTrace();
}
Baz baz = new Baz();
sink.onBazResult(baz);
}
}).start();
};
public static void main(String[] args) {
System.err.println("Starting Main");
System.err.println((new ImplementingThing()).doSomethingAndBlock(null, null));
System.err.println("Ending Main");
}
Ответ 4
Очень простой пример: просто понять CountDownLatch без каких-либо дополнительный код.
A java.util.concurrent.CountDownLatch
представляет собой конструкцию concurrency, которая позволяет одному или нескольким потокам ждать завершения заданного набора операций.
A CountDownLatch
инициализируется данным подсчетом. Этот счет уменьшается с помощью вызовов метода countDown()
. Темы, ожидающие, что этот счет достигнет нуля, могут вызвать один из методов await()
. Вызов await()
блокирует поток до тех пор, пока счетчик не достигнет нуля.
Ниже приведен простой пример. После того, как Decrementer вызвал countDown()
3 раза на CountDownLatch
, ожидающий официант освобождается от вызова await()
.
Вы также можете упомянуть некоторые TimeOut
для ожидания.
CountDownLatch latch = new CountDownLatch(3);
Waiter waiter = new Waiter(latch);
Decrementer decrementer = new Decrementer(latch);
new Thread(waiter) .start();
new Thread(decrementer).start();
Thread.sleep(4000);
public class Waiter implements Runnable{
CountDownLatch latch = null;
public Waiter(CountDownLatch latch) {
this.latch = latch;
}
public void run() {
try {
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Waiter Released");
}
}
//--------------
public class Decrementer implements Runnable {
CountDownLatch latch = null;
public Decrementer(CountDownLatch latch) {
this.latch = latch;
}
public void run() {
try {
Thread.sleep(1000);
this.latch.countDown();
Thread.sleep(1000);
this.latch.countDown();
Thread.sleep(1000);
this.latch.countDown();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
Ссылка
Если вы не хотите использовать CountDownLatch
, или ваше требование - это то же самое, что и Facebook, и в отличие от функциональности. Значит, если вызывается один метод, не вызывайте другой метод.
В этом случае вы можете объявить
private volatile Boolean isInprocessOfLikeOrUnLike = false;
а затем вы можете проверить начало вызова метода, если он false
, тогда метод вызова иначе возвращает.. зависит от вашей реализации.
Ответ 5
Это очень просто с RxJava 2.x:
try {
Baz baz = Single.create((SingleEmitter<Baz> emitter) ->
doSomethingAsync(fooArg, barArg, result -> emitter.onSuccess(result)))
.toFuture().get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
Или без лямбда-нотации:
Baz baz = Single.create(new SingleOnSubscribe<Baz>() {
@Override
public void subscribe(SingleEmitter<Baz> emitter) {
doSomethingAsync(fooArg, barArg, new BazComputationSink() {
@Override
public void onBazResult(Baz result) {
emitter.onSuccess(result);
}
});
}
}).toFuture().get();
Еще проще:
Baz baz = Single.create((SingleEmitter<Baz> emitter) ->
doSomethingAsync(fooArg, barArg, result -> emitter.onSuccess(result)))
.blockingGet();
Ответ 6
Здесь более общее решение, основанное на Поле Вагланде, отвечает:
public abstract class AsyncRunnable<T> {
protected abstract void run(AtomicReference<T> notifier);
protected final void finish(AtomicReference<T> notifier, T result) {
synchronized (notifier) {
notifier.set(result);
notifier.notify();
}
}
public static <T> T wait(AsyncRunnable<T> runnable) {
final AtomicReference<T> notifier = new AtomicReference<>();
// run the asynchronous code
runnable.run(notifier);
// wait for the asynchronous code to finish
synchronized (notifier) {
while (notifier.get() == null) {
try {
notifier.wait();
} catch (InterruptedException ignore) {}
}
}
// return the result of the asynchronous code
return notifier.get();
}
}
Вот пример того, как его использовать::
String result = AsyncRunnable.wait(new AsyncRunnable<String>() {
@Override
public void run(final AtomicReference<String> notifier) {
// here goes your async code, e.g.:
new Thread(new Runnable() {
@Override
public void run() {
finish(notifier, "This was a asynchronous call!");
}
}).start();
}
});
Более подробную версию кода можно найти здесь: http://pastebin.com/hKHJUBqE
EDIT:
Пример, связанный с вопросом:
public Baz doSomethingAndBlock(final Foo fooArg, final Bar barArg) {
return AsyncRunnable.wait(new AsyncRunnable<Baz>() {
@Override
protected void run(final AtomicReference<Baz> notifier) {
doSomethingAsync(fooArg, barArg, new BazComputationSink() {
public void onBazResult(Baz result) {
synchronized (notifier) {
notifier.set(result);
notifier.notify();
}
}
});
}
});
}
Ответ 7
Самый простой способ (который работает для меня) заключается в
- Создать очередь блокировки
- Вызовите асинхронный метод - используйте обработчик, который предлагает результат в эту очередь блокировки.
-
Опрос очереди (то, где вы блокируете) для результата.
public Baz doSomethingAndBlock(Foo fooArg, Bar barArg) throws InterruptedException {
final BlockingQueue<Baz> blocker = new LinkedBlockingQueue();
doSomethingAsync(fooArg, barArg, blocker::offer);
// Now block until response or timeout
return blocker.poll(30, TimeUnit.SECONDS);
}