Ответ 1
Важно узнать о Cold vs Hot Observables. Если ваши наблюдатели холодны, то их операции не будут выполняться, если у вас нет подписчиков. Следовательно, чтобы "отменить" , просто убедитесь, что все Наблюдатели отписываются от источника Observable.
Однако, если только один Наблюдатель источника не подписывается, и есть другие Наблюдатели, все еще подписавшиеся на источник, это не приведет к "отмене". В этом случае вы можете использовать (но это не единственное решение) ConnectableObservables. Также см. эту ссылку о Rx.NET.
Практический способ использования ConnectableObservables - просто вызвать .publish().refCount()
на любом холодном наблюдаемом. То, что это делает, создает один единственный "прокси" Observer, который передает события от источника к фактическим Наблюдателям. Прокси-сервер Observer отменяет подписку, когда последний фактический Observer не подписывается.
Чтобы вручную управлять ConnectableObservable, вызовите только coldSource.publish()
, и вы получите экземпляр ConnectableObservable. Затем вы можете вызвать .connect()
, который вернет вам подписку "прокси" Observer. Чтобы вручную "отменить" источник, вы просто отмените подписку на прокси-наблюдатель.
Для вашей конкретной проблемы вы также можете использовать оператор .takeUntil()
.
Предположим, что ваше "окончательное будущее" портировано как finalStream
в RxJava, и предположим, что "отменить события" являются Observables cancelStream1
, cancelStream2
и т.д., тогда становится довольно просто "отменить" операции в результате finalStream
:
Observable<FooBar> finalAndCancelableStream = finalStream
.takeUntil( Observable.merge(cancelStream1, cancelStream2) );
В диаграммах вот как работает takeUntil, а это то, как работает слияние.
На простом английском языке вы можете прочитать его как "finalAndCancelableStream - это finalStream, пока ни отменитьStream1, ни cancelStream2 не выпустить событие".