RxJS, как опросить API, чтобы постоянно проверять обновленные записи, используя динамическую метку времени
Я новичок в RxJS, и я пытаюсь написать приложение, которое выполнит следующие действия:
- При загрузке сделайте запрос AJAX (поддельный как
fetchItems()
для простоты), чтобы получить список элементов.
- Через секунду после этого сделайте запрос AJAX для получения элементов.
- При проверке новых элементов ТОЛЬКО элементы, которые были изменены после того, как самая последняя временная метка должна быть возвращена.
- Не должно быть никакого состояния, внешнего по отношению к наблюдаемым.
Моя первая попытка была очень прямой и соответствовала целям 1, 2 и 4.
var data$ = Rx.Observable.interval(1000)
.startWith('run right away')
.map(function() {
// `fetchItems(modifiedSince)` returns an array of items modified after `modifiedSince`, but
// I am not yet tracking the `modifiedSince` timestamp yet so all items will always be returned
return fetchItems();
});
Теперь я взволнован, это было легко, не может быть намного труднее встретить цель 3... через несколько часов это где я нахожусь в:
var modifiedSince = null;
var data$ = Rx.Observable.interval(1000)
.startWith('run right away')
.flatMap(function() {
// `fetchItems(modifiedSince)` returns an array of items modified after `modifiedSince`
return fetchItems(modifiedSince);
})
.do(function(item) {
if(item.updatedAt > modifiedSince) {
modifiedSince = item.updatedAt;
}
})
.scan(function(previous, current) {
previous.push(current);
return previous;
}, []);
Это решает цель 3, но регрессирует по цели 4. Теперь я сохраняю состояние вне наблюдаемого.
Я предполагаю, что глобальный блок modifiedSince
и .do()
не лучший способ выполнить это. Любое руководство будет с большой благодарностью.
EDIT: надеюсь, уточнил, что я ищу с этим вопросом.
Ответы
Ответ 1
Вот еще одно решение, которое не использует замыкание или "внешнее состояние".
Я сделал следующую гипотезу:
-
fetchItems
возвращает Rx.Observable
элементов, т.е. не массив элементов
Он использует оператор expand
, который позволяет испускать значения, которые следуют за рекурсивным соотношением типа x_n+1 = f(x_n)
. Вы передаете x_n+1
, возвращая наблюдаемое, которое испускает это значение, например Rx.Observable.return(x_n+1)
, и вы можете закончить рекурсию, вернув Rx.Observable.empty()
. Здесь кажется, что у вас нет конечного условия, поэтому это будет работать вечно.
scan
также позволяет испускать значения после рекурсивного отношения (x_n+1 = f(x_n, y_n)
). Разница в том, что scan
заставляет вас использовать синхронную функцию (поэтому x_n+1
синхронизируется с y_n
), а при expand
вы можете использовать асинхронную функцию в виде наблюдаемого.
Код не проверен, поэтому держите меня в курсе, если это работает или нет.
Соответствующая документация: expand, combineLatest
var modifiedSinceInitValue = // put your date here
var polling_frequency = // put your value here
var initial_state = {modifiedSince: modifiedSinceInitValue, itemArray : []}
function max(property) {
return function (acc, current) {
acc = current[property] > acc ? current[property] : acc;
}
}
var data$ = Rx.Observable.return(initial_state)
.expand (function(state){
return fetchItem(state.modifiedSince)
.toArray()
.combineLatest(Rx.Observable.interval(polling_frequency).take(1),
function (itemArray, _) {
return {
modifiedSince : itemArray.reduce(max('updatedAt'), modifiedSinceInitValue),
itemArray : itemArray
}
}
})
Ответ 2
Как насчет этого:
var interval = 1000;
function fetchItems() {
return items;
}
var data$ = Rx.Observable.interval(interval)
.map(function() { return fetchItems(); })
.filter(function(x) {return x.lastModified > Date.now() - interval}
.skip(1)
.startWith(fetchItems());
Это должно фильтровать источник только для новых элементов, а также начинать с полной коллекции. Просто напишите функцию фильтра, соответствующую вашему источнику данных.
Или путем передачи аргумента fetchItems:
var interval = 1000;
function fetchItems(modifiedSince) {
var retVal = modifiedSince ? items.filter( function(x) {return x.lastModified > modifiedSince}) : items
return retVal;
}
var data$ = Rx.Observable.interval(interval)
.map(function() { return fetchItems(Date.now() - interval); })
.skip(1)
.startWith(fetchItems());
Ответ 3
Вы, кажется, имеете в виду, что modifiedSince
является частью состояния, которое вы переносите, поэтому оно должно появиться в scan
. Почему бы вам не переместить действие в do
в сканирование тоже?. Тогда ваше семя будет {modifiedSince: null, itemArray: []}
.
Errr, я просто подумал, что это может не сработать, так как вам нужно отправить modifiedSince
обратно в функцию fetchItem
, которая находится вверху. У вас нет цикла здесь? Это означает, что вам придется использовать субъекта для разрыва этого цикла. В качестве альтернативы вы можете попытаться сохранить modifiedSince
в капсуле в закрытии. Что-то вроде
function pollItems (fetchItems, polling_frequency) {
var modifiedSince = null;
var data$ = Rx.Observable.interval(polling_frequency)
.startWith('run right away')
.flatMap(function() {
// `fetchItems(modifiedSince)` returns an array of items modified after `modifiedSince`
return fetchItems(modifiedSince);
})
.do(function(item) {
if(item.updatedAt > modifiedSince) {
modifiedSince = item.updatedAt;
}
})
.scan(function(previous, current) {
previous.push(current);
return previous;
}, []);
return data$;
}
Я должен закончиться, чтобы отпраздновать новый год, если это не сработает, я могу еще раз попробовать (возможно, используя оператор expand
, другую версию scan
).