Как работает отмена задания в RxJava?

Я не понимаю, как реализовать отмену задачи в RXJava.

Я заинтересован в переносе существующего API, созданного с использованием Guava ListenableFuture. Мой вариант использования выглядит следующим образом:

  • У меня есть одна операция, состоящая из последовательности фьючерсов, соединенных Futures.transform()
  • Несколько подписчиков соблюдают окончательное будущее операции.
  • Каждый наблюдатель может отменить окончательное будущее, и все наблюдатели будут свидетелями события отмены.
  • Отмена окончательного будущего приводит к отмене его зависимостей, например. в последовательности 123, отмена 3 распространяется до 2 и т.д.

В RxJava очень мало информации об этом; единственные ссылки, которые я могу найти для отмены, упоминают Subscription как эквивалент .NET Disposable, но, насколько я вижу, Subscription предлагает возможность отказаться от подписки на последующие значения в последовательности.

Я не понимаю, как реализовать "любой абонент может отменить" семантику через этот API. Думаю ли я об этом неправильно?

Любой ввод будет оценен.

Ответы

Ответ 1

Важно узнать о Cold vs Hot Observables. Если ваши наблюдатели холодны, то их операции не будут выполняться, если у вас нет подписчиков. Следовательно, чтобы "отменить" , просто убедитесь, что все Наблюдатели отписываются от источника Observable.

Однако, если только один Наблюдатель источника не подписывается, и есть другие Наблюдатели, все еще подписавшиеся на источник, это не приведет к "отмене". В этом случае вы можете использовать (но это не единственное решение) ConnectableObservables. Также см. эту ссылку о Rx.NET.

Практический способ использования ConnectableObservables - просто вызвать .publish().refCount() на любом холодном наблюдаемом. То, что это делает, создает один единственный "прокси" Observer, который передает события от источника к фактическим Наблюдателям. Прокси-сервер Observer отменяет подписку, когда последний фактический Observer не подписывается.

Чтобы вручную управлять ConnectableObservable, вызовите только coldSource.publish(), и вы получите экземпляр ConnectableObservable. Затем вы можете вызвать .connect(), который вернет вам подписку "прокси" Observer. Чтобы вручную "отменить" источник, вы просто отмените подписку на прокси-наблюдатель.


Для вашей конкретной проблемы вы также можете использовать оператор .takeUntil().

Предположим, что ваше "окончательное будущее" портировано как finalStream в RxJava, и предположим, что "отменить события" являются Observables cancelStream1, cancelStream2 и т.д., тогда становится довольно просто "отменить" операции в результате finalStream:

Observable<FooBar> finalAndCancelableStream = finalStream
    .takeUntil( Observable.merge(cancelStream1, cancelStream2) );

В диаграммах вот как работает takeUntil, а это то, как работает слияние.

На простом английском языке вы можете прочитать его как "finalAndCancelableStream - это finalStream, пока ни отменитьStream1, ни cancelStream2 не выпустить событие".