RxJava Observable получает уведомление о первой эмиссии
У меня есть три Observables, которые я объединяю с combLastest:
Observable<String> o1 = Observable.just("1");
Observable<String> o2 = Observable.just("2");
Observable<String> o3 = Observable.just("3");
Observable.combineLatest(o1, o2, o3, new Func3<String, String, String, Object>() {
@Override
public Object call(String s, String s2, String s3) {
return null;
}
});
Я хочу получать уведомления о первой эмиссии одной из наблюдаемых, не игнорируя более поздние эмиссии, что, я думаю, первый оператор сделал бы. Есть ли удобный оператор для этого типа (пример):
o1.doOnFirst(new Func1<String, Void>() {
@Override
public Void call(String s) {
return null;
}
})
Ответы
Ответ 1
Я думаю, что вы можете иметь практический doOnFirst
с простым take
, если вы обрабатываете поток:
public static <T> Observable<T> withDoOnFirst(Observable<T> source, Action1<T> action) {
return source.take(1).doOnNext(action).concatWith(source);
}
Таким образом, действие привязывается только к первому элементу.
Это может быть изменено для обработки наблюдаемых элементов не, поддерживаемых потоками, добавляя skip
, чтобы пропустить уже принятые элементы:
public static <T> Observable<T> withDoOnFirstNonStream(Observable<T> source, Action1<T> action) {
return source.take(1).doOnNext(action).concatWith(source.skip(1));
}
Ответ 2
Есть несколько решений, о которых я могу думать.
Первый - уродливый, но простой взлом doOnNext. Просто добавьте булевское поле в Action1
, указав, был ли получен первый элемент. Получив, сделайте все, что вы хотите, и переверните логическое значение. Например:
Observable.just("1").doOnNext(new Action1<String>() {
boolean first = true;
@Override
public void call(String t) {
if (first) {
// Do soemthing
first = false;
}
}
});
Второй - дважды подписаться на наблюдаемый, который вы хотите отслеживать, используя publish
или share()
, причем одна из этих публикаций проходит через first
(в зависимости от того, хотите ли вы вручную подключиться к опубликованному наблюдаемому), В результате вы получите две отдельные наблюдаемые данные, которые излучают одни и те же предметы, только первая будет остановлена после первой эмиссии:
ConnectableObservable<String> o1 = Observable.just("1").publish();
o1.first().subscribe(System.out::println); //Subscirbed only to the first item
o1.subscribe(System.out::println); //Subscirbed to all items
o1.connect(); //Connect both subscribers
Ответ 3
Используя rxjava-extras:
observable
.compose(Transformers.doOnFirst(System.out::println))
Модуль тестировался и под крышкой просто использует счетчик для подписки в операторе. Обратите внимание, что для каждой подписки важно, так как существует множество случаев использования, когда наблюдаемый экземпляр используется более одного раза, и мы хотим, чтобы оператор doOnFirst
применялся каждый раз.
Исходный код здесь.
Ответ 4
Для удобства я создал эти функции расширения для Flowable
и Observable
.
Обратите внимание, что с doOnFirst()
действие будет вызвано до выброса первого элемента, в то время как doAfterFirst()
сначала издаст первый элемент, а затем выполнит действие.
fun <T> Observable<T>.doOnFirst(onFirstAction: (T) -> Unit): Observable<T> =
take(1)
.doOnNext { onFirstAction.invoke(it) }
.concatWith(skip(1))
fun <T> Flowable<T>.doOnFirst(onFirstAction: (T) -> Unit): Flowable<T> =
take(1)
.doOnNext { onFirstAction.invoke(it) }
.concatWith(skip(1))
fun <T> Observable<T>.doAfterFirst(afterFirstAction: (T) -> Unit): Observable<T> =
take(1)
.doAfterNext { afterFirstAction.invoke(it) }
.concatWith(skip(1))
fun <T> Flowable<T>.doAfterFirst(afterFirstAction: (T) -> Unit): Flowable<T> =
take(1)
.doAfterNext { afterFirstAction.invoke(it) }
.concatWith(skip(1))
Использование так просто:
Flowable.fromArray(1, 2, 3)
.doOnFirst { System.err.println("First $it") }
.subscribe { println(it) }
Выход:
// First 1
// 1
// 2
// 3
А:
Flowable.fromArray(1, 2, 3)
.doAfterFirst { System.err.println("First $it") }
.subscribe { println(it) }
Выход:
// 1
// First 1
// 2
// 3