Как подкласс Observable в RxJava?

Я знаю, что вам следует избегать этого любой ценой, но что, если у меня есть допустимый прецедент для подкласса Observable в RxJava? Является ли это возможным? Как я могу это сделать?

В этом конкретном случае у меня есть класс "репозиторий", который в настоящее время возвращает запросы:

class Request<T> {
    public abstract Object key();
    public abstract Observable<T> asObservable();

    [...]

    public Request<T> transform(Func1<Request<T>, Observable<T>> transformation) {
        Request<T> self = this;
        return new Request<T>() {
             @Override public Object key() { return self.key; }
             @Override public Observable<T> asObservable() { return transformation.call(self); }
        }
    }
}

Затем я использую метод преобразования для изменения наблюдаемого ответа (asObservable) в контексте, где мне нужен ключ запроса (например, кэширование):

 service.getItemList() // <- returns a Request<List<Item>>
     .transform(r -> r.asObservable()
             // The activity is the current Activity in Android
             .compose(Operators.ensureThereIsAnAccount(activity))
             // The cache comes last because we don't need auth for cached responses
             .compose(cache.cacheTransformation(r.key())))
     .asObservable()
     [...  your common RxJava code ...]

Теперь было бы очень удобно, если бы мой класс Request был подклассом Observable, так как тогда я мог бы удалить все вызовы .asObservable(), и клиенты даже не знали бы о моем классе Request.

Ответы

Ответ 1

Возможно подкласс Observable (мы делаем это для Subject и ConnectableObservable s), но это требует дополнительного рассмотрения, потому что вам нужно передать обратный вызов OnSubscribe для обработки входящих * t23 > s. Мне непонятно, что должен делать ваш запрос в случае, если кто-то подписался на него, поэтому я приведу вам два примера расширения Observable:

Наблюдаемый без общего измененного состояния

Если у вас нет изменяемого состояния, которое будет использоваться между подписчиками, вы можете просто расширить Observable и передать в своем действии значение super

public final class MyObservable extends Observable<Long> {
    public MyObservable() {
        super(new OnSubscribe<Long>() {
            @Override public void call(Subscriber<? super Long> child) {
                child.onNext(System.currentTimeMillis());
                child.onCompleted();
            }
        });
    }
}

Наблюдается с общим измененным состоянием

Это обычно сложнее, потому что вам нужно получить доступ к общему состоянию из метода OnSubscribe и методов Observable, но Java не позволит вам коснуться полей экземпляра из внутреннего класса OnSubscribe до super завершено. Решение состоит в том, чтобы разделить такое разделенное состояние и OnSubscribe от конструктора и использовать статический метод factory для настройки обоих:

public final class MySharedObservable extends Observable<Long> {
    public static MySharedObservable create() {
        final AtomicLong counter = new AtomicLong();
        OnSubscribe<Long> onSubscribe = new OnSubscribe<Long>() {
            @Override
            public void call(Subscriber<? super Long> t1) {
                t1.onNext(counter.incrementAndGet());
                t1.onCompleted();
            }
        };
        return new MySharedObservable(onSubscribe, counter);
    }
    private AtomicLong counter;

    private MySharedObservable(OnSubscribe<Long> onSubscribe, AtomicLong counter) {
        super(onSubscribe);
        this.counter = counter;
    }
    public long getCounter() {
        return counter.get();
    }
}