Ответ 1
Возможно подкласс Observable (мы делаем это для Subject
и ConnectableObservable
s), но это требует дополнительного рассмотрения, потому что вам нужно передать обратный вызов OnSubscribe
для обработки входящих * t23 > s. Мне непонятно, что должен делать ваш запрос в случае, если кто-то подписался на него, поэтому я приведу вам два примера расширения Observable:
Наблюдаемый без общего измененного состояния
Если у вас нет изменяемого состояния, которое будет использоваться между подписчиками, вы можете просто расширить Observable и передать в своем действии значение super
public final class MyObservable extends Observable<Long> {
public MyObservable() {
super(new OnSubscribe<Long>() {
@Override public void call(Subscriber<? super Long> child) {
child.onNext(System.currentTimeMillis());
child.onCompleted();
}
});
}
}
Наблюдается с общим измененным состоянием
Это обычно сложнее, потому что вам нужно получить доступ к общему состоянию из метода OnSubscribe
и методов Observable, но Java не позволит вам коснуться полей экземпляра из внутреннего класса OnSubscribe
до super
завершено. Решение состоит в том, чтобы разделить такое разделенное состояние и OnSubscribe
от конструктора и использовать статический метод factory для настройки обоих:
public final class MySharedObservable extends Observable<Long> {
public static MySharedObservable create() {
final AtomicLong counter = new AtomicLong();
OnSubscribe<Long> onSubscribe = new OnSubscribe<Long>() {
@Override
public void call(Subscriber<? super Long> t1) {
t1.onNext(counter.incrementAndGet());
t1.onCompleted();
}
};
return new MySharedObservable(onSubscribe, counter);
}
private AtomicLong counter;
private MySharedObservable(OnSubscribe<Long> onSubscribe, AtomicLong counter) {
super(onSubscribe);
this.counter = counter;
}
public long getCounter() {
return counter.get();
}
}