Задержка RxJava для каждого элемента списка, испускаемого
Я изо всех сил пытаюсь реализовать то, что, как я предполагал, будет довольно простым в Rx.
У меня есть список предметов, и я хочу, чтобы каждый предмет выдавался с задержкой.
Кажется, что оператор Rx delay() просто смещает излучение всех элементов на указанную задержку, а не каждого отдельного элемента.
Вот некоторый тестовый код. Он группирует элементы в списке. Каждая группа должна затем применить задержку перед отправкой.
Observable.range(1, 5)
.groupBy(n -> n % 5)
.flatMap(g -> g.toList())
.delay(50, TimeUnit.MILLISECONDS)
.doOnNext(item -> {
System.out.println(System.currentTimeMillis() - timeNow);
System.out.println(item);
System.out.println(" ");
}).toList().toBlocking().first();
Результат:
154ms
[5]
155ms
[2]
155ms
[1]
155ms
[3]
155ms
[4]
Но то, что я ожидал увидеть, выглядит примерно так:
174ms
[5]
230ms
[2]
285ms
[1]
345ms
[3]
399ms
[4]
Что я делаю неправильно?
Ответы
Ответ 1
Один из способов сделать это - использовать zip
, чтобы объединить ваши наблюдаемые с помощью Interval
наблюдаемый для задержки выхода.
Observable.zip(Observable.range(1, 5)
.groupBy(n -> n % 5)
.flatMap(g -> g.toList()),
Observable.interval(50, TimeUnit.MILLISECONDS),
(obs, timer) -> obs)
.doOnNext(item -> {
System.out.println(System.currentTimeMillis() - timeNow);
System.out.println(item);
System.out.println(" ");
}).toList().toBlocking().first();
Ответ 2
Простейший способ сделать это, похоже, просто использовать concatMap и обернуть каждый элемент в задержанном Obserable.
long startTime = System.currentTimeMillis();
Observable.range(1, 5)
.concatMap(i-> Observable.just(i).delay(50, TimeUnit.MILLISECONDS))
.doOnNext(i-> System.out.println(
"Item: " + i + ", Time: " + (System.currentTimeMillis() - startTime) +"ms"))
.toCompletable().await();
Отпечатки:
Item: 1, Time: 51ms
Item: 2, Time: 101ms
Item: 3, Time: 151ms
Item: 4, Time: 202ms
Item: 5, Time: 252ms
Ответ 3
Просто используйте простой подход для выделения каждого элемента в коллекции с интервалом:
Observable.just(1,2,3,4,5)
.zipWith(Observable.interval(500, TimeUnit.MILLISECONDS), (item, interval) -> item)
.subscribe(System.out::println);
Каждый элемент будет выходить каждые 500 миллисекунд
Ответ 4
Для пользователей kotlin я написал функцию расширения для подхода "почтовый индекс с интервалом"
import io.reactivex.Observable
import io.reactivex.functions.BiFunction
import java.util.concurrent.TimeUnit
fun <T> Observable<T>.delayEach(interval: Long, timeUnit: TimeUnit): Observable<T> =
Observable.zip(
this,
Observable.interval(interval, timeUnit),
BiFunction { item, _ -> item }
)
Это работает так же, но это делает его многоразовым. Пример:
Observable.range(1, 5)
.delayEach(1, TimeUnit.SECONDS)
Ответ 5
Вы можете реализовать пользовательский оператор rx, например MinRegularIntervalDelayOperator
, а затем используйте это с помощью функции lift
Observable.range(1, 5)
.groupBy(n -> n % 5)
.flatMap(g -> g.toList())
.lift(new MinRegularIntervalDelayOperator<Integer>(50L))
.doOnNext(item -> {
System.out.println(System.currentTimeMillis() - timeNow);
System.out.println(item);
System.out.println(" ");
}).toList().toBlocking().first();
Ответ 6
Я думаю, что это именно то, что вам нужно. Взгляни:
long startTime = System.currentTimeMillis();
Observable.intervalRange(1, 5, 0, 50, TimeUnit.MILLISECONDS)
.timestamp(TimeUnit.MILLISECONDS)
.subscribe(emitTime -> {
System.out.println(emitTime.time() - startTime);
});
Ответ 7
Чтобы ввести задержку между каждым испускаемым элементом, полезно:
List<String> letters = new ArrayList<>(Arrays.asList("a", "b", "c", "d"));
Observable.fromIterable(letters)
.concatMap(item -> Observable.interval(1, TimeUnit.SECONDS)
.take(1)
.map(second -> item))
.subscribe(System.out::println);
Больше хороших вариантов на https://github.com/ReactiveX/RxJava/issues/3505
Ответ 8
Чтобы задержать каждую группу, вы можете изменить свой flatMap()
, чтобы вернуть Observable, который задерживает излучение группы.
Observable
.range(1, 5)
.groupBy(n -> n % 5)
.flatMap(g ->
Observable
.timer(50, TimeUnit.MILLISECONDS)
.flatMap(t -> g.toList())
)
.doOnNext(item -> {
System.out.println(System.currentTimeMillis() - timeNow);
System.out.println(item);
System.out.println(" ");
}).toList().toBlocking().first();
Ответ 9
Не очень чистый способ заключается в изменении задержки с помощью итерации с помощью оператора .delay(Func1).
Observable.range(1, 5)
.delay(n -> n*50)
.groupBy(n -> n % 5)
.flatMap(g -> g.toList())
.doOnNext(item -> {
System.out.println(System.currentTimeMillis() - timeNow);
System.out.println(item);
System.out.println(" ");
}).toList().toBlocking().first();
Ответ 10
Есть и другой способ сделать это, используя concatMap, поскольку concatMap возвращает наблюдаемые исходные элементы. поэтому мы можем добавить задержку на наблюдаемое.
вот что я пробовал.
Observable.range(1, 5)
.groupBy(n -> n % 5)
.concatMap(integerIntegerGroupedObservable ->
integerIntegerGroupedObservable.delay(2000, TimeUnit.MILLISECONDS))
.doOnNext(item -> {
System.out.println(System.currentTimeMillis() - timeNow);
System.out.println(item);
System.out.println(" ");
}).toList().toBlocking().first();
Ответ 11
Ты можешь использовать
Observable.interval(1, TimeUnit.SECONDS)
.map(new Function<Long, Integer>() {
@Override
public Integer apply(Long aLong) throws Exception {
return aLong.intValue() + 1;
}
})
.startWith(0)
.take(listInput.size())
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer index) throws Exception {
Log.d(TAG, "---index of your list --" + index);
}
});
Этот код выше не дублирует значение (индекс). "Я уверен"
Ответ 12
Я думаю, вы этого хотите:
Observable.range(1, 5)
.delay(50, TimeUnit.MILLISECONDS)
.groupBy(n -> n % 5)
.flatMap(g -> g.toList())
.doOnNext(item -> {
System.out.println(System.currentTimeMillis() - timeNow);
System.out.println(item);
System.out.println(" ");
}).toList().toBlocking().first();
Таким образом, он будет задерживать числа, входящие в группу, а не задерживать сокращенный список на 5 секунд.
Ответ 13
Вы можете добавить задержку между испускаемыми элементами, используя flatMap, maxConcurrent и delay()
Вот пример - испустите 0..4 с задержкой
@Test
fun testEmitWithDelays() {
val DELAY = 500L
val COUNT = 5
val latch = CountDownLatch(1)
val startMoment = System.currentTimeMillis()
var endMoment : Long = 0
Observable
.range(0, COUNT)
.flatMap( { Observable.just(it).delay(DELAY, TimeUnit.MILLISECONDS) }, 1) // maxConcurrent = 1
.subscribe(
{ println("... value: $it, ${System.currentTimeMillis() - startMoment}") },
{},
{
endMoment = System.currentTimeMillis()
latch.countDown()
})
latch.await()
assertTrue { endMoment - startMoment >= DELAY * COUNT }
}
... value: 0, 540
... value: 1, 1042
... value: 2, 1544
... value: 3, 2045
... value: 4, 2547
Ответ 14
вы сможете достичь этого, используя оператор Timer
. Я попытался с delay
, но не смог добиться желаемого результата. Обратите внимание, что вложенные операции выполняются в операторе flatmap
.
Observable.range(1,5)
.flatMap(x -> Observable.timer(50 * x, TimeUnit.MILLISECONDS)
.map(y -> x))
// attach timestamp
.timestamp()
.subscribe(timedIntegers ->
Log.i(TAG, "Timed String: "
+ timedIntegers.value()
+ " "
+ timedIntegers.time()));
Ответ 15
Observable.just("A", "B", "C", "D", "E", "F")
.flatMap { item -> Thread.sleep(2000)
Observable.just( item ) }
.subscribe { println( it ) }