Ответ 1
Observable<T>
представляет собой монаду, связанную операцию, а не выполнение самой операции. Это описательный язык, а не императив, к которому вы привыкли. Для выполнения операции вам .subscribe()
. Каждый раз, когда вы подписываете новый поток выполнения, создается с нуля. Не путайте потоки с потоками, так как подписка выполняется синхронно, если вы не указали изменение потока с помощью .subscribeOn()
или .observeOn()
. Вы связываете новые элементы с любой существующей операцией /monad/Observable, чтобы добавить новое поведение, например, изменение потоков, фильтрацию, накопление, преобразование и т.д. В случае ваша наблюдаемая дорогая операция, которую вы не хотите повторять при каждой подписке, вы можете предотвратить отдых, используя .cache()
.
Чтобы сделать любую асинхронную/синхронную операцию Observable<T>
в синхронной встроенной, используйте .toBlocking()
, чтобы изменить ее тип на BlockingObservable<T>
, Вместо .subscribe()
он содержит новые методы для выполнения операций над каждым результатом с помощью .forEach()
или принуждения с помощью .first()
Обсерватории - хороший инструмент, потому что они в основном * детерминированы (одни и те же входы всегда дают одинаковые результаты, если вы не делаете что-то неправильно), многоразовые (вы можете отправлять их как часть шаблона команды/политики) и для большая часть игнорирует совпадение, потому что они не должны полагаться на общее состояние (что-то не так). BlockingObservables хороши, если вы пытаетесь принести библиотеку с наблюдаемыми на императивный язык или просто выполняете операцию над Observable, которой у вас есть 100% уверенность в том, что она хорошо управляется.
Архитектура вашего приложения вокруг этих принципов - это изменение парадигмы, на которое я действительно не могу ответить на этот ответ.
* Существуют такие нарушения, как
Subject
иObservable.create()
, которые необходимы для интеграции с императивными рамками.