Последовательность RxJS эквивалентна обещанию .then()?
Раньше я много развивался с обещанием, и теперь я перехожу к RxJS. Документ RxJS не дает очень четкого примера того, как перейти от последовательности обещаний к последовательности наблюдателей.
Например, я обычно пишу цепочку обещаний с несколькими шагами, например
// a function that returns a promise
getPromise()
.then(function(result) {
// do something
})
.then(function(result) {
// do something
})
.then(function(result) {
// do something
})
.catch(function(err) {
// handle error
});
Как мне переписать эту цепочку обещаний в стиле RxJS?
Ответы
Ответ 1
Для потока данных (эквивалентно then
):
Rx.Observable.fromPromise(...)
.flatMap(function(result) {
// do something
})
.flatMap(function(result) {
// do something
})
.subscribe(function onNext(result) {
// end of chain
}, function onError(error) {
// process the error
});
Обещание может быть преобразовано в наблюдаемое с Rx.Observable.fromPromise
.
Некоторые операторы обещаний имеют прямой перевод. Например, RSVP.all
или jQuery.when
можно заменить на Rx.Observable.forkJoin
.
Имейте в виду, что у вас есть группа операторов, которая позволяет асинхронно преобразовывать данные и выполнять задачи, которые вы не можете или будет очень трудно сделать с помощью promises. Rxjs показывает все свои возможности с асинхронными последовательностями данных (последовательность, то есть более 1 асинхронного значения).
Для управления ошибками объект немного сложнее.
- есть catch и finally операторы тоже
-
retryWhen
также может помочь повторить последовательность в случае ошибки
- вы также можете иметь дело с ошибками в самом подписчике с помощью функции
onError
.
Для точной семантики более подробно рассмотрите документацию и примеры, которые вы можете найти в Интернете, или задайте здесь конкретные вопросы.
Это определенно станет хорошей отправной точкой для более глубокого управления ошибками с помощью Rxjs: https://xgrommx.github.io/rx-book/content/getting_started_with_rxjs/creating_and_querying_observable_sequences/error_handling.html
Ответ 2
Более современная альтернатива:
import {from as fromPromise} from 'rxjs';
import {catchError, flatMap} from 'rxjs/operators';
fromPromise(...).pipe(
flatMap(result => {
// do something
}),
flatMap(result => {
// do something
}),
flatMap(result => {
// do something
}),
catchError(error => {
// handle error
})
)
Также обратите внимание, что для того, чтобы все это работало, вам нужно где-то subscribe
на этот канал Observable
, но я предполагаю, что он обрабатывается в какой-то другой части приложения.
Ответ 3
Обновление мая 2019 года с использованием RxJs 6
Согласитесь с приведенными выше ответами, хотел бы добавить конкретный пример с некоторыми игрушечными данными и простыми обещаниями (с setTimeout), используя RxJs v6 для большей ясности.
Просто обновите переданный идентификатор (в настоящее время жестко запрограммированный как 1
) на что-то, что не существует, чтобы выполнить логику обработки ошибок тоже. Важно отметить, что также отметить использование of
с catchError
сообщения.
import { from as fromPromise, of } from "rxjs";
import { catchError, flatMap, tap } from "rxjs/operators";
const posts = [
{ title: "I love JavaScript", author: "Wes Bos", id: 1 },
{ title: "CSS!", author: "Chris Coyier", id: 2 },
{ title: "Dev tools tricks", author: "Addy Osmani", id: 3 }
];
const authors = [
{ name: "Wes Bos", twitter: "@wesbos", bio: "Canadian Developer" },
{
name: "Chris Coyier",
twitter: "@chriscoyier",
bio: "CSS Tricks and CodePen"
},
{ name: "Addy Osmani", twitter: "@addyosmani", bio: "Googler" }
];
function getPostById(id) {
return new Promise((resolve, reject) => {
setTimeout(() => {
const post = posts.find(post => post.id === id);
if (post) {
console.log("ok, post found!");
resolve(post);
} else {
reject(Error("Post not found!"));
}
}, 200);
});
}
function hydrateAuthor(post) {
return new Promise((resolve, reject) => {
setTimeout(() => {
const authorDetails = authors.find(person => person.name === post.author);
if (authorDetails) {
post.author = authorDetails;
console.log("ok, post hydrated with author info");
resolve(post);
} else {
reject(Error("Author not Found!"));
}
}, 200);
});
}
function dehydratePostTitle(post) {
return new Promise((resolve, reject) => {
setTimeout(() => {
delete post.title;
console.log("ok, applied transformation to remove title");
resolve(post);
}, 200);
});
}
// ok, here is how it looks regarding this question..
let source$ = fromPromise(getPostById(1)).pipe(
flatMap(post => {
return hydrateAuthor(post);
}),
flatMap(post => {
return dehydratePostTitle(post);
}),
catchError(error => of('Caught error: ${error}'))
);
source$.subscribe(console.log);
Выходные данные:
ok, post found!
ok, post hydrated with author info
ok, applied transformation to remove title
{ author:
{ name: 'Wes Bos',
twitter: '@wesbos',
bio: 'Canadian Developer' },
id: 1 }
Ключевая часть, эквивалентна следующей, использующей простой поток управления обещаниями:
getPostById(1)
.then(post => {
return hydrateAuthor(post);
})
.then(post => {
return dehydratePostTitle(post);
})
.then(author => {
console.log(author);
})
.catch(err => {
console.error(err);
});
Ответ 4
если функция getPromise
находится в середине потокового канала, вы должны просто обернуть ее в одну из функций mergeMap
, switchMap
или concatMap
(обычно mergeMap
):
stream$.pipe(
mergeMap(data => getPromise(data)),
filter(...),
map(...)
).subscribe(...);
если вы хотите начать свой поток с getPromise()
то оберните его from
функции:
import {from} from 'rxjs';
from(getPromise()).pipe(
filter(...)
map(...)
).subscribe(...);
Ответ 5
Насколько я только что узнал, если вы возвращаете результат в flatMap, он преобразует его в массив, даже если вы вернули строку.
Но если вы возвращаете Observable, эта Observable может вернуть строку;
Ответ 6
Если я правильно понял, вы имеете в виду использование значений, и в этом случае вы используете sbuscribe, т.е.
const arrObservable = from([1,2,3,4,5,6,7,8]);
arrObservable.subscribe(number => console.log(num) );
Кроме того, вы можете просто превратить наблюдаемое в обещание, используя toPromise(), как показано ниже:
arrObservable.toPromise().then()
Ответ 7
Вот как я это сделал.
предварительно
public fetchContacts(onCompleteFn: (response: gapi.client.Response<gapi.client.people.ListConnectionsResponse>) => void) {
const request = gapi.client.people.people.connections.list({
resourceName: 'people/me',
pageSize: 100,
personFields: 'phoneNumbers,organizations,emailAddresses,names'
}).then(response => {
onCompleteFn(response as gapi.client.Response<gapi.client.people.ListConnectionsResponse>);
});
}
// caller:
this.gapi.fetchContacts((rsp: gapi.client.Response<gapi.client.people.ListConnectionsResponse>) => {
// handle rsp;
});
После того, как (ют?)
public fetchContacts(): Observable<gapi.client.Response<gapi.client.people.ListConnectionsResponse>> {
return from(
new Promise((resolve, reject) => {
gapi.client.people.people.connections.list({
resourceName: 'people/me',
pageSize: 100,
personFields: 'phoneNumbers,organizations,emailAddresses,names'
}).then(result => {
resolve(result);
});
})
).pipe(map((result: gapi.client.Response<gapi.client.people.ListConnectionsResponse>) => {
return result; //map is not really required if you not changing anything in the response. you can just return the from() and caller would subscribe to it.
}));
}
// caller
this.gapi.fetchContacts().subscribe(((rsp: gapi.client.Response<gapi.client.people.ListConnectionsResponse>) => {
// handle rsp
}), (error) => {
// handle error
});