Ответ 1
И Джеймс, и Ричард сделали несколько хороших моментов, но я не думаю, что они дали вам лучший метод для решения вашей проблемы.
Джеймс предложил использовать .Catch(Observable.Never<Unit>())
. Он был не прав, когда сказал, что "будет... позволить потоку продолжать", потому что, как только вы попадаете в исключение, поток должен закончиться - это то, что Ричард указал, когда он упомянул контракт между наблюдателями и наблюдаемыми.
Кроме того, использование Never
таким образом приведет к тому, что ваши наблюдаемые никогда не будут завершены.
Короткий ответ заключается в том, что .Catch(Observable.Empty<Unit>())
- это правильный способ изменить последовательность из последовательности, заканчивающейся ошибкой, с завершением завершения.
Вы выбрали правильную идею использования SelectMany
для обработки каждого значения исходной коллекции, чтобы вы могли поймать каждое исключение, но у вас осталось несколько проблем.
Вы используете задачи (TPL), чтобы превратить вызов функции в наблюдаемый. Это заставляет наблюдаемые использовать потоки пула задач, что означает, что оператор SelectMany
, скорее всего, приведет к значениям в недетерминированном порядке.
Также вы скрываете фактические вызовы для более точной обработки рефакторинга и обслуживания данных.
Думаю, вам лучше создать метод расширения, позволяющий пропускать исключения. Вот он:
public static IObservable<R> SelectAndSkipOnException<T, R>(
this IObservable<T> source, Func<T, R> selector)
{
return
source
.Select(t =>
Observable.Start(() => selector(t)).Catch(Observable.Empty<R>()))
.Merge();
}
С помощью этого метода вы можете просто сделать это:
var result =
collection.ToObservable()
.SelectAndSkipOnException(t =>
{
var a = DoA(t);
var b = DoB(a);
var c = DoC(b);
return c;
});
Этот код намного проще, но он скрывает исключение (-ы). Если вы хотите зависать с исключениями, позволяя продолжить свою последовательность, вам нужно сделать дополнительную забаву. Добавление нескольких перегрузок в метод расширения Materialize
работает, чтобы сохранить ошибки.
public static IObservable<Notification<R>> Materialize<T, R>(
this IObservable<T> source, Func<T, R> selector)
{
return source.Select(t => Notification.CreateOnNext(t)).Materialize(selector);
}
public static IObservable<Notification<R>> Materialize<T, R>(
this IObservable<Notification<T>> source, Func<T, R> selector)
{
Func<Notification<T>, Notification<R>> f = nt =>
{
if (nt.Kind == NotificationKind.OnNext)
{
try
{
return Notification.CreateOnNext<R>(selector(nt.Value));
}
catch (Exception ex)
{
ex.Data["Value"] = nt.Value;
ex.Data["Selector"] = selector;
return Notification.CreateOnError<R>(ex);
}
}
else
{
if (nt.Kind == NotificationKind.OnError)
{
return Notification.CreateOnError<R>(nt.Exception);
}
else
{
return Notification.CreateOnCompleted<R>();
}
}
};
return source.Select(nt => f(nt));
}
Эти методы позволяют вам написать следующее:
var result =
collection
.ToObservable()
.Materialize(t =>
{
var a = DoA(t);
var b = DoB(a);
var c = DoC(b);
return c;
})
.Do(nt =>
{
if (nt.Kind == NotificationKind.OnError)
{
/* Process the error in `nt.Exception` */
}
})
.Where(nt => nt.Kind != NotificationKind.OnError)
.Dematerialize();
Вы можете даже объединить эти методы Materialize
и использовать ex.Data["Value"]
и ex.Data["Selector"]
, чтобы получить значение и функцию выбора, которые вывели ошибку.
Надеюсь, это поможет.