RxJava и параллельное выполнение кода наблюдателя
У меня есть следующий код с использованием RxJava Observable api:
Observable<Info> observable = fileProcessor.processFileObservable(processedFile.getAbsolutePath());
observable
.buffer(10000)
.observeOn(Schedulers.computation())
.subscribe(recordInfo -> {
_logger.info("Running stage2 on thread with id : " + Thread.currentThread().getId());
for(Info info : recordInfo) {
// some I/O operation logic
}
},
exception -> {
},
() -> {
});
Я ожидал, что код наблюдения, то есть код внутри метода subscribe(), будет выполняться параллельно после того, как я задал планировщик вычислений. Вместо этого код по-прежнему выполняется последовательно в одном потоке. Как заставить код запускаться параллельно с помощью RxJava api.
Ответы
Ответ 1
RxJava часто неправильно понимается, когда речь идет об асинхронных/многопоточных аспектах. Кодирование многопоточных операций прост, но понимание абстракции - это еще одна вещь.
Общий вопрос о RxJava заключается в том, как добиться параллелизации или испускать сразу несколько элементов из Observable. Конечно, это определение нарушает Наблюдаемый контракт, в котором говорится, что onNext() необходимо вызывать последовательно и никогда одновременно не более чем по одному потоку за раз.
Для достижения parallelism вам понадобятся несколько Observables.
Это выполняется в одном потоке:
Observable<Integer> vals = Observable.range(1,10);
vals.subscribeOn(Schedulers.computation())
.map(i -> intenseCalculation(i))
.subscribe(val -> System.out.println("Subscriber received "
+ val + " on "
+ Thread.currentThread().getName()));
Это выполняется в нескольких потоках:
Observable<Integer> vals = Observable.range(1,10);
vals.flatMap(val -> Observable.just(val)
.subscribeOn(Schedulers.computation())
.map(i -> intenseCalculation(i))
).subscribe(val -> System.out.println(val));
Код и текст поступает из этого сообщения в блоге.
Ответ 2
В RxJava 2.0.5 представлены потоки параллелизма и ParallelFlowable, которые делают параллельное выполнение более простым и более декларативным.
Вам больше не нужно создавать Observable
/Flowable
в flatMap
, вы можете просто вызвать parallel()
на Flowable
и возвращает ParallelFlowable
.
Он не так богат, как обычный Flowable
, потому что параллелизм порождает много проблем с Rx-контрактами, но у вас есть basic map()
, filter()
и многие другие, которых в большинстве случаев должно быть достаточно.
Так что вместо этого потока от @LordRaydenMK отвечу
Observable<Integer> vals = Observable.range(1,10);
vals.flatMap(val -> Observable.just(val)
.subscribeOn(Schedulers.computation())
.map(i -> intenseCalculation(i))
).subscribe(val -> System.out.println(val));
Теперь вы можете сделать:
Flowable<Integer> vals = Flowable.range(1, 10);
vals.parallel()
.runOn(Schedulers.computation())
.map(i -> intenseCalculation(i))
.sequential()
.subscribe(val -> System.out.println(val));
Ответ 3
Для этой цели вам нужно указать subscribeOn(Schedulers.computation())
вместо observeOn(Schedulers.computation())
.
В subscribeOn
вы объявляете, в каком потоке вы собираетесь испускать свои значения.
В observeOn
вы объявляете, в каком потоке вы собираетесь обрабатывать и наблюдать за ними.
Ответ 4
Использование flatMap
и указать, чтобы подписаться на Schedulers.computation()
достигнет concurrency.
Вот более практичный пример с использованием Callable
, с выхода, мы видим, что для завершения всех задач потребуется около 2000 миллисекунд.
static class MyCallable implements Callable<Integer> {
private static final Object CALLABLE_COUNT_LOCK = new Object();
private static int callableCount;
@Override
public Integer call() throws Exception {
Thread.sleep(2000);
synchronized (CALLABLE_COUNT_LOCK) {
return callableCount++;
}
}
public static int getCallableCount() {
synchronized (CALLABLE_COUNT_LOCK) {
return callableCount;
}
}
}
private static void runMyCallableConcurrentlyWithRxJava() {
long startTimeMillis = System.currentTimeMillis();
final Semaphore semaphore = new Semaphore(1);
try {
semaphore.acquire();
} catch (InterruptedException e) {
e.printStackTrace();
}
Observable.just(new MyCallable(), new MyCallable(), new MyCallable(), new MyCallable())
.flatMap(new Function<MyCallable, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(@NonNull MyCallable myCallable) throws Exception {
return Observable.fromCallable(myCallable).subscribeOn(Schedulers.computation());
}
})
.subscribeOn(Schedulers.computation())
.subscribe(new Observer<Object>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
}
@Override
public void onNext(@NonNull Object o) {
System.out.println("onNext " + o);
}
@Override
public void onError(@NonNull Throwable e) {
}
@Override
public void onComplete() {
if (MyCallable.getCallableCount() >= 4) {
semaphore.release();
}
}
});
try {
semaphore.acquire();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
semaphore.release();
}
System.out.println("durationMillis " + (System.currentTimeMillis()-startTimeMillis));
}
Ответ 5
Это все равно происходит в той же последовательности. Даже на новых потоках
Наблюдаемый ob3 = Observable.range(1, 5);
ob3.flatMap(new Func1<Integer, Observable<Integer>>() {
@Override
public Observable<Integer> call(Integer pArg0) {
return Observable.just(pArg0);
}
}).subscribeOn(Schedulers.newThread()).map(new Func1<Integer, Integer>() {
@Override
public Integer call(Integer pArg0) {
try {
Thread.sleep(1000 - (pArg0 * 100));
System.out.println(pArg0 + " ccc " + Thread.currentThread().getName());
} catch (Exception e) {
e.printStackTrace();
}
return pArg0;
}
}).subscribe();
Выход
1 ccc RxNewThreadScheduler-1
2 ccc RxNewThreadScheduler-1
3 ccc RxNewThreadScheduler-1
4 ccc RxNewThreadScheduler-1
5 ccc RxNewThreadScheduler-1