Как реализовать IObserver с помощью методов async/awend OnNext/OnError/OnCompleted?

Я пытаюсь написать метод расширения для System.Net.WebSocket, который превратит его в IObserver с помощью Reactive Extensions (Rx.NET). Вы можете увидеть код ниже:

public static IObserver<T> ToObserver<T>(this WebSocket webSocket, IWebSocketMessageSerializer<T> webSocketMessageSerializer)
{
    // Wrap the web socket in an interface that a little easier to manage
    var webSocketMessageStream = new WebSocketMessageStream(webSocket);

    // Create the output stream to the client
    return Observer.Create<T>(
        onNext:      async message => await webSocketMessageStream.WriteMessageAsync(webSocketMessageSerializer.SerializeMessage(message)),
        onError:     async error   => await webSocketMessageStream.CloseAsync(WebSocketCloseStatus.InternalServerError, string.Format("{0}: {1}", error.GetType(), error.Message)),
        onCompleted: async ()      => await webSocketMessageStream.CloseAsync(WebSocketCloseStatus.NormalClosure, "Server disconnected")
    );
}

Этот код работает, но меня беспокоит использование async/await внутри onNext, onError и onCompleted lambdas. Я знаю, что это возвращает асинхронную lambda, которая нахмурилась (и иногда вызывает проблемы, с которыми я уже столкнулся).

Я читал документацию Rx.NET, а также сообщения в блогах по всему Интернету, и я не могу найти подходящий способ (если есть) использовать метод async/await в IObserver. Есть ли правильный способ сделать это? Если нет, то как я могу обойти эту проблему?

Ответы

Ответ 1

Подписаться не принимает async методы. Итак, что происходит здесь, вы используете механизм "огонь-и-забыть" от async void. Проблема в том, что onNext сообщения больше не будут сериализованы.

Вместо того, чтобы вызывать метод async внутри Subscription, вы должны перенести его в конвейер, чтобы позволить Rx ждать его.

Хорошо использовать fire-and-forget on onError и onCompleted потому что они гарантированно будут последним, что вызвано из Observable. onError в виду, что ресурсы, связанные с Observable могут быть очищены после onError и onCompleted до их завершения.

Я написал небольшой метод расширения, который делает все это:

    public static IDisposable SubscribeAsync<T>(this IObservable<T> source, Func<T, Task> onNext, Action<Exception> onError, Action onCompleted)
    {
        return source.Select(e => Observable.Defer(() => onNext(e).ToObservable())).Concat()
            .Subscribe(
            e => { }, // empty
            onError,
            onCompleted);
    }

Сначала мы преобразуем метод async onNext в Observable, async onNext два асинхронных мира. Это означает, что async/await внутри onNext будет соблюдаться. Затем мы переносим этот метод в Defer чтобы он не начинался сразу, когда он был создан. Вместо этого Concat будет называть следующий отложенный наблюдаемый только тогда, когда предыдущий закончен.

Ps. Я надеюсь, что в следующем выпуске Rx.Net будет поддержка async в подписке.