Наблюдаемый против Flowable rxJava2
Я смотрел новый rx java 2, и я не совсем уверен, что я больше понимаю идею backpressure
...
Я знаю, что у нас есть Observable
, у которого нет поддержки backpressure
и Flowable
, которая имеет его.
Итак, на примере, скажем, у меня Flowable
с interval
:
Flowable.interval(1, TimeUnit.MILLISECONDS, Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
// do smth
}
});
Это приведет к сбою после 128 значений, и это довольно очевидно, что я потребляю медленнее, чем получение предметов.
Но тогда мы имеем то же самое с Observable
Observable.interval(1, TimeUnit.MILLISECONDS, Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
// do smth
}
});
Это не сработает вообще, даже если я позабочусь о его потреблении, он все равно работает. Чтобы сделать работу Flowable
, скажем, я положил оператор onBackpressureDrop
, сбой исчез, но не все значения также испускаются.
Итак, базовый вопрос, который я не могу найти в моей голове, почему я должен заботиться о backpressure
, когда я могу использовать plain Observable
, все равно получать все значения без управления buffer
? Или, может быть, с другой стороны, какие преимущества делают backpressure
в пользу управления и обработки потребления?
Ответы
Ответ 1
Какое противодавление проявляется на практике - это ограниченные буферы, Flowable.observeOn
имеет буфер из 128 элементов, который сливается так быстро, как может его принять. Вы можете увеличить размер этого буфера индивидуально, чтобы обрабатывать пакетный источник, и все методы управления противодавлением по-прежнему применяются с 1.x. Observable.observeOn
имеет неограниченный буфер, который продолжает собирать элементы, и ваше приложение может закончиться без памяти.
Вы можете использовать Observable
, например:
- обработка графических интерфейсов
- работа с короткими последовательностями (всего менее 1000 элементов)
Вы можете использовать Flowable
, например:
- холодные и несрочные источники
- генераторы, подобные источникам
- устройства доступа к сети и базы данных
Ответ 2
Обратное давление - это когда ваш наблюдаемый (издатель) создает больше событий, чем может выдержать ваш подписчик. Таким образом, вы можете получить подписчиков пропущенных событий, или вы можете получить огромную очередь событий, которая в конечном итоге просто приводит к нехватке памяти. Flowable
принимает противодавление. Observable
нет. Это оно.
это напоминает мне о воронке, которая, когда у нее слишком много жидкости, переливается. Текучий может помочь не допустить этого:
с огромным противодавлением:
![enter image description here]()
но при использовании текучего противодавления намного меньше:
![enter image description here]()
Rxjava2 имеет несколько стратегий противодавления, которые вы можете использовать в зависимости от вашего варианта использования. под стратегией я подразумеваю, что Rxjava2 предоставляет способ обработки объектов, которые не могут быть обработаны из-за переполнения (противодавления).
вот стратегии. Я не буду проходить их все, но, например, если вы не хотите беспокоиться о переполненных элементах, вы можете использовать такую стратегию отбрасывания:
observable.toFlowable(BackpressureStrategy.DROP)
Насколько я знаю, в очереди должно быть ограничение в 128 элементов, после чего может быть переполнение (обратное давление). Даже если его не 128, это близко к этому числу. Надеюсь, это кому-нибудь поможет.
если вам нужно изменить размер буфера с 128, похоже, это можно сделать так (но следите за любыми ограничениями памяти:
myObservable.toFlowable(BackpressureStrategy.MISSING).buffer(256); //but using MISSING might be slower.
В разработке программного обеспечения обычно стратегия противодействия означает, что вы говорите эмитенту немного замедлиться, так как потребитель не может справиться со скоростью ваших излучающих событий.
Ответ 3
Тот факт, что ваш Flowable
разбился после испускания 128 значений без обработки обратного давления, не означает, что он всегда будет терпеть крах после 128 значений: иногда он вылетает после 10, и иногда он вообще не падает. Я считаю, что это произошло, когда вы попробовали пример с Observable
- там не было противодавления, поэтому ваш код работал нормально, в следующий раз он может и не быть. Разница в RxJava 2 заключается в том, что больше нет понятия противодавления в Observable
и нет способа справиться с этим. Если вы разрабатываете реактивную последовательность, которая, вероятно, потребует явной обработки противодавления, тогда Flowable
- ваш лучший выбор.