RxJs: опрос до тех пор, пока интервал не будет выполнен или не будут получены правильные данные
Как выполнить следующий сценарий в браузере с помощью RxJs:
- отправить данные в очередь для обработки
- вернуть идентификатор задания
- опросить другую конечную точку каждые 1 с до тех пор, пока не будет достигнут результат или не пройдет 60 секунд (затем провалится)
Промежуточное решение, с которым я столкнулся:
Rx.Observable
.fromPromise(submitJobToQueue(jobData))
.flatMap(jobQueueData =>
Rx.Observable
.interval(1000)
.delay(5000)
.map(_ => jobQueueData.jobId)
.take(55)
)
.flatMap(jobId => Rx.Observable.fromPromise(pollQueueForResult(jobId)))
.filter(result => result.completed)
.subscribe(
result => console.log('Result', result),
error => console.log('Error', error)
);
- Есть ли способ без промежуточных переменных остановить таймер после поступления данных или возникновения ошибки? Теперь я могу представить новые наблюдаемые, а затем использовать
takeUntil
- Использование
flatMap
здесь семантически корректно? Возможно, все это должно быть переписано и не привязано цепью с помощью flatMap
?
Ответы
Ответ 1
Начиная с верха, у вас есть обещание превратиться в наблюдаемое. Как только это даст значение, вы хотите сделать звонок один раз в секунду, пока не получите определенный ответ (успех) или пока не пройдет определенное количество времени. Мы можем сопоставить каждую часть этого объяснения с методом Rx:
"Как только это дает значение" = map
/flatMap
(flatMap
в этом случае, потому что то, что будет дальше, также будет наблюдаемым, и нам нужно их сгладить)
"один раз в секунду" = interval
"получить определенный ответ" = filter
"или" = amb
"определенное количество времени прошло" = timer
Оттуда мы можем собрать это вместе так:
Rx.Observable
.fromPromise(submitJobToQueue(jobData))
.flatMap(jobQueueData =>
Rx.Observable.interval(1000)
.flatMap(() => pollQueueForResult(jobQueueData.jobId))
.filter(x => x.completed)
.take(1)
.map(() => 'Completed')
.amb(
Rx.Observable.timer(60000)
.flatMap(() => Rx.Observable.throw(new Error('Timeout')))
)
)
.subscribe(
x => console.log('Result', x),
x => console.log('Error', x)
)
;
Как только мы получим наш первоначальный результат, мы прогнозируем это в гонке между двумя наблюдаемыми, которая даст значение, когда получит успешный ответ, и тот, который даст значение, когда пройдет определенное количество времени, Второй flatMap
существует потому, что .throw
отсутствует в наблюдаемых экземплярах, а метод на Rx.Observable
возвращает наблюдаемый, который также должен быть сплющен.
Оказывается, что комбинация amb
/timer
может быть фактически заменена на timeout
, например:
Rx.Observable
.fromPromise(submitJobToQueue(jobData))
.flatMap(jobQueueData =>
Rx.Observable.interval(1000)
.flatMap(() => pollQueueForResult(jobQueueData.jobId))
.filter(x => x.completed)
.take(1)
.map(() => 'Completed')
.timeout(60000, Rx.Observable.throw(new Error('Timeout')))
)
.subscribe(
x => console.log('Result', x),
x => console.log('Error', x)
)
;
Я пропустил .delay
, который у вас был в вашем примере, поскольку он не был описан в вашей желаемой логике, но он может быть установлен тривиально для этого решения.
Итак, чтобы прямо ответить на ваши вопросы:
- В приведенном выше коде нет необходимости вручную останавливать что-либо, так как
interval
будет удаляться, как только количество подписчиков упадет до нуля, что произойдет либо при take(1)
, либо amb
/timeout
завершает.
- Да, оба варианта использования в вашем оригинале были действительными, так как в обоих случаях вы проецировали каждый элемент наблюдаемого на новый наблюдаемый и хотели сгладить результирующее наблюдаемое наблюдение за регулярным наблюдаемым.
Здесь jsbin Я собрал вместе, чтобы протестировать решение (вы можете настроить значение, возвращаемое в pollQueueForResult
, чтобы получить желаемый успех/таймаут, раз были разделены на 10 для быстрого тестирования).
Ответ 2
Небольшая оптимизация для отличного ответа от @matt-burnell. Вы можете заменить операторы фильтр и на оператором первым следующим образом
Rx.Observable
.fromPromise(submitJobToQueue(jobData))
.flatMap(jobQueueData =>
Rx.Observable.interval(1000)
.flatMap(() => pollQueueForResult(jobQueueData.jobId))
.first(x => x.completed)
.map(() => 'Completed')
.timeout(60000, Rx.Observable.throw(new Error('Timeout')))
)
.subscribe(
x => console.log('Result', x),
x => console.log('Error', x)
);
Кроме того, для людей, которые могут не знать, оператор flatMap является псевдонимом для mergeMap в RxJS 5.0.
Ответ 3
Не твой вопрос, но мне нужен такой же функционал
import { takeWhileInclusive } from 'rxjs-take-while-inclusive'
import { of, interval, race, throwError } from 'rxjs'
import { catchError, timeout, mergeMap, delay, switchMapTo } from 'rxjs/operators'
const defaultMaxWaitTimeMilliseconds = 5 * 1000
function isAsyncThingSatisfied(result) {
return true
}
export function doAsyncThingSeveralTimesWithTimeout(
doAsyncThingReturnsPromise,
maxWaitTimeMilliseconds = defaultMaxWaitTimeMilliseconds,
checkEveryMilliseconds = 500,
) {
const subject$ = race(
interval(checkEveryMilliseconds).pipe(
mergeMap(() => doAsyncThingReturnsPromise()),
takeWhileInclusive(result => isAsyncThingSatisfied(result)),
),
of(null).pipe(
delay(maxWaitTimeMilliseconds),
switchMapTo(throwError('doAsyncThingSeveralTimesWithTimeout timeout'))
)
)
return subject$.toPromise(Promise) // will return first result satistieble result of doAsyncThingReturnsPromise or throw error on timeout
}
пример
// mailhogWaitForNEmails
import { takeWhileInclusive } from 'rxjs-take-while-inclusive'
import { of, interval, race, throwError } from 'rxjs'
import { catchError, timeout, mergeMap, delay, switchMap } from 'rxjs/operators'
const defaultMaxWaitTimeMilliseconds = 5 * 1000
export function mailhogWaitForNEmails(
mailhogClient,
numberOfExpectedEmails,
maxWaitTimeMilliseconds = defaultMaxWaitTimeMilliseconds,
checkEveryMilliseconds = 500,
) {
let tries = 0
const mails$ = race(
interval(checkEveryMilliseconds).pipe(
mergeMap(() => mailhogClient.getAll()),
takeWhileInclusive(mails => {
tries += 1
return mails.total < numberOfExpectedEmails
}),
),
of(null).pipe(
delay(maxWaitTimeMilliseconds),
switchMap(() => throwError('mailhogWaitForNEmails timeout after ${tries} tries'))
)
)
// toPromise returns promise which contains the last value from the Observable sequence.
// If the Observable sequence is in error, then the Promise will be in the rejected stage.
// If the sequence is empty, the Promise will not resolve.
return mails$.toPromise(Promise)
}
// mailhogWaitForEmailAndClean
import { mailhogWaitForNEmails } from './mailhogWaitForNEmails'
export async function mailhogWaitForEmailAndClean(mailhogClient) {
const mails = await mailhogWaitForNEmails(mailhogClient, 1)
if (mails.count !== 1) {
throw new Error(
'Expected to receive 1 email, but received ${mails.count} emails',
)
}
await mailhogClient.deleteAll()
return mails.items[0]
}