RxJS несколько подписей на Observable?

Я создаю свои собственные наблюдаемые и подписанные две функции. Я ожидал бы, что обе функции выполняются для каждого элемента в последовательности, но только последняя.

let observer = null
const notificationArrayStream = Rx.Observable.create(function (obs) {
  observer = obs;
  return () => {}
})

function trigger(something) {
  observer.next(something)
}

notificationArrayStream.subscribe((x) => console.log('a: ' + x))
notificationArrayStream.subscribe((x) => console.log('b: ' + x))

trigger('TEST')

Ожидаемый результат

a: TEST
b: TEST

Фактический выход

b: TEST

Здесь JSBin: http://jsbin.com/cahoyey/edit?js,console

Почему это? Как я могу добавить несколько функций к одному наблюдаемому?

Ответы

Ответ 1

Чтобы несколько функций подписывались на один Наблюдаемый, просто подпишите их к наблюдаемому, это так просто. И на самом деле то, что вы сделали.

НО ваш код не работает, потому что после notificationArrayStream.subscribe((x) => console.log('b: ' + x)) выполняется, observer is (x) => console.log('b: ' + x)), поэтому observer.next даст вам b: TEST.

Так что в основном это ваше наблюдаемое творение, которое не так. В create вы передали наблюдателя в качестве параметра, чтобы передать его значения. Те ценности, которые вам нужно каким-то образом создать по вашей собственной логике, но поскольку вы можете видеть, что ваша логика здесь ошибочна. Я бы рекомендовал вам использовать тему, если вы хотите передать значения наблюдателю.

Что-то вроде:

const notificationArrayStream = Rx.Observable.create(function (obs) {
  mySubject.subscribe(obs);
  return () => {}
})

function trigger(something) {
  mySubject.next(something)
}

Ответ 2

Предмет

В вашем случае вы можете просто использовать Subject. Субъект позволяет вам разделить одно исполнение с несколькими наблюдателями при использовании его в качестве прокси для группы подписчиков и источника.

В сущности, здесь ваш пример с использованием темы:

const subject = new Subject();

function trigger(something) {
    subject.next(something);
}

subject.subscribe((x) => console.log('a: ' + x));
subject.subscribe((x) => console.log('b: ' + x));

trigger('TEST');

Результат:

a: TEST
b: TEST

Pitfall: наблюдатели, прибывающие слишком поздно

Обратите внимание, что время, когда вы подписываетесь и когда вы передаете данные, имеет значение. Если вы отправляете широковещательную рассылку перед подпиской, вы не получаете уведомление этой трансляцией:

function trigger(something) {
    subject.next(something);
}

trigger('TEST');

subject.subscribe((x) => console.log('a: ' + x));
subject.subscribe((x) => console.log('b: ' + x));

Результат: (пусто)


ReplaySubject & BehaviorSubject

Если вы хотите, чтобы даже будущие подписчики получали уведомление, вы можете использовать ReplaySubject или BehaviorSubject.

Здесь будет ReplaySubject пример использования ReplaySubject (с размером кеша 5, что означает до 5 значений из прошлого, в отличие от объекта BehaviorSubject, который может помнить только последнее значение):

const subject = new ReplaySubject(5); // buffer size is 5

function trigger(something) {
    subject.next(something);
}

trigger('TEST');

subject.subscribe((x) => console.log('a: ' + x));
subject.subscribe((x) => console.log('b: ' + x));

Результат:

a: TEST
b: TEST

Ответ 3

Каждый раз, когда вы подписываетесь, вы переопределяете наблюдателя var.

Функция триггера ссылается только на один var, поэтому нет ничего удивительного, что есть только один журнал.

Если мы создадим массив var, он будет работать по назначению: JS Bin

let obs = [];

let foo = Rx.Observable.create(function (observer) {
  obs.push(observer);
});

function trigger(sth){
//   console.log('trigger fn');
  obs.forEach(ob => ob.next(sth));
}

foo.subscribe(function (x) {
  console.log('a:${x}');
});
foo.subscribe(function (y) {
  console.log('b:${y}');
});

trigger(1);
trigger(2);
trigger(3);
trigger(4);

Более чистым решением было бы использовать Субъект, как было предложено выше.

Ответ 4

Вы можете создать класс обложки под подпиской <> на основе ReplaySubject. Он был бы более чистым, чем управление субъектом и наблюдаемым:

export class Subscribable<T> {

    private valueSource: Subject = new ReplaySubject(1);
    public value: Observable;
    private _value: T;

    constructor() {
        this.value = this.valueSource.asObservable();
    }

    public set(val: T) {
        this.valueSource.next(val);
        this._value = val;
    }

    public get(): T {
        return this._value;
    }
}

Применение:

let arrayStream : Subscribable<TYPE> = new Subscribable<TYPE>();

…
public setArrayStream (value: TYPE) {
    this.set(value);
}

Изменение значения рукоятки:

arrayStream.value.subscribe(res => { /*handle it*/ });

Оригинальная статья: http://devinstance.net/articles/20170921/rxjs-scribcribable

Ответ 5

Вместо использования темы также можно использовать комманду publishReplay() + refCount(), чтобы можно было наблюдать многоадресную рассылку нескольким абонентам:

const notificationArrayStream = Rx.Observable.create(function (obs) {
  observer = obs;
  return () => {}
}).pipe(publishReplay(), refCount())