Фильтрация наблюдаемого с использованием значений из другого наблюдаемого

У меня есть два наблюдаемых:

  • Наблюдаемый, представляющий список входов флажков.
  • Наблюдаемое представление потока событий, поступающих с сервера.

Я хотел бы отфильтровать второй наблюдаемый, используя значения из первого.

Значения, полученные с сервера, включают свойство tag, которое соответствует значениям в списке флажков. Наблюдаемое, полученное в результате комбинации указанных выше двух, приведет только к значениям от сервера, чье свойство tag включено в набор отмеченных флажков.

Ответы

Ответ 1

Вы можете использовать withLatestFrom. введите описание изображения здесь.

source.withLatestFrom(checkboxes, (data, checkbox) => ({data, checkbox}))
  .filter(({data, checkbox}) => ...)

Здесь checkboxes является наблюдаемым, представляющим список входов checkbox. source является наблюдаемым, представляющим поток событий, поступающих с сервера. В функции фильтра вы можете проверить, действительно ли данные действительны по сравнению с настройками флажка и разрешить им прохождение.

Обратите внимание, что важно checkboxes испускает не менее 1 значения, прежде чем поток может испустить что-либо.

Ps. Что касается других ответов, это решение работает, даже если источник cold.

Ответ 2

Чтобы фильтровать поток A, используя значения потока B, вам необходимо наблюдать поток B и использовать последние значения для фильтрации потока A.

Используйте switch(), чтобы преобразовать B наблюдаемый в наблюдаемые значения для получения из наблюдаемого A.

checkedInputValuesSource
    .map(function (options) {
        return dataSource
            .filter(function (value) {
                return options.indexOf(value) !== -1;
            });
    })
    .switch()
    .subscribe(function (x) {
        console.log('out: ' + x);
    });

Использование switch() предполагает, что dataSource является горячим наблюдаемым.

Пример использования interval() для создания фиктивных данных:

var input,
    checkedInputValuesSource,
    dataSource;

input = document.querySelectorAll('input');

// Generate source describing the current filter.
checkedInputValuesSource = Rx.Observable
    .fromEvent(input, 'change')
    .map(function () {
        var inputs = document.querySelectorAll('input'),
            checkedInputValues = [];
        
        [].forEach.call(inputs, function (e) {
            if (e.checked) {
                checkedInputValues.push(e.value);
            }
        });
        
        return checkedInputValues;
    })
    .startWith([]);

// Generate random data source (hot).
dataSource = Rx.Observable
    .interval(500)
    .map(function () {
        var options = ['a', 'b', 'c'];
    
        return options[Math.floor(Math.floor(Math.random() * options.length))];
    })
    .do(function (x) {
        console.log('in: ' + x);
    })
    .share();

checkedInputValuesSource
    .map(function (options) {
        return dataSource
            .filter(function (value) {
                return options.indexOf(value) !== -1;
            });
    })
    .switch()
    .subscribe(function (x) {
        console.log('out: ' + x);
    });
<script src='https://rawgit.com/Reactive-Extensions/RxJS/v.2.5.3/dist/rx.all.js'></script>

<input type='checkbox' value='a'>
<input type='checkbox' value='b'>
<input type='checkbox' value='c'>