Как использовать RxJava Interval Operator
Я изучаю оператор RxJava, и я нашел этот код ниже, ничего не печатал:
public static void main(String[] args) {
Observable
.interval(1, TimeUnit.SECONDS)
.subscribe(new Subscriber<Long>() {
@Override
public void onCompleted() {
System.out.println("onCompleted");
}
@Override
public void onError(Throwable e) {
System.out.println("onError -> " + e.getMessage());
}
@Override
public void onNext(Long l) {
System.out.println("onNext -> " + l);
}
});
}
Как ReactiveX, interval
создать наблюдаемое, которое испускает последовательность целых чисел, разделенных определенным интервалом времени
Я сделал ошибку или забыл о чем-то?
Ответы
Ответ 1
Вы должны блокировать до тех пор, пока не будет потреблено наблюдаемое:
public static void main(String[] args) throws Exception {
CountDownLatch latch = new CountDownLatch(1);
Observable
.interval(1, TimeUnit.SECONDS)
.subscribe(new Subscriber<Long>() {
@Override
public void onCompleted() {
System.out.println("onCompleted");
// make sure to complete only when observable is done
latch.countDown();
}
@Override
public void onError(Throwable e) {
System.out.println("onError -> " + e.getMessage());
}
@Override
public void onNext(Long l) {
System.out.println("onNext -> " + l);
}
});
// wait for observable to complete (never in this case...)
latch.await();
}
Вы можете добавить .take(10)
, например, чтобы увидеть наблюдаемое завершено.
Ответ 2
Положите Thread.sleep(1000000)
после подписания, и вы увидите, что он работает. Observable.interval
работает по умолчанию на Schedulers.computation()
, поэтому ваш поток запускается в потоке, отличном от основного потока.
Ответ 3
Как они говорят, что уже интервал работает асинхронно, поэтому вам нужно дождаться завершения всех событий.
Вы можете получить подписку после подписания, а затем использовать TestSubcriber, который является частью платформы reactiveX, и который даст вам возможность ждать завершения всех событий.
@Test
public void testObservableInterval() throws InterruptedException {
Subscription subscription = Observable.interval(1, TimeUnit.SECONDS)
.map(time-> "item emitted")
.subscribe(System.out::print,
item -> System.out.print("final:" + item));
new TestSubscriber((Observer) subscription)
.awaitTerminalEvent(100, TimeUnit.MILLISECONDS);
}
В моем github больше примеров, если вам нужно https://github.com/politrons/reactive/blob/master/src/test/java/rx/observables/scheduler/ObservableAsynchronous.java
Ответ 4
Просто добавьте в конце кода System.in.read()