Ответ 1
Подписаться не принимает async
методы. Итак, что происходит здесь, вы используете механизм "огонь-и-забыть" от async void
. Проблема в том, что onNext сообщения больше не будут сериализованы.
Вместо того, чтобы вызывать метод async
внутри Subscription, вы должны перенести его в конвейер, чтобы позволить Rx ждать его.
Хорошо использовать fire-and-forget on onError
и onCompleted
потому что они гарантированно будут последним, что вызвано из Observable
. onError
в виду, что ресурсы, связанные с Observable
могут быть очищены после onError
и onCompleted
до их завершения.
Я написал небольшой метод расширения, который делает все это:
public static IDisposable SubscribeAsync<T>(this IObservable<T> source, Func<T, Task> onNext, Action<Exception> onError, Action onCompleted)
{
return source.Select(e => Observable.Defer(() => onNext(e).ToObservable())).Concat()
.Subscribe(
e => { }, // empty
onError,
onCompleted);
}
Сначала мы преобразуем метод async onNext
в Observable
, async onNext
два асинхронных мира. Это означает, что async/await внутри onNext
будет соблюдаться. Затем мы переносим этот метод в Defer
чтобы он не начинался сразу, когда он был создан. Вместо этого Concat
будет называть следующий отложенный наблюдаемый только тогда, когда предыдущий закончен.
Ps. Я надеюсь, что в следующем выпуске Rx.Net будет поддержка async
в подписке.