Как я жду ответа от объекта RX без введения условия гонки?
У меня есть служба, которая позволяет вызывающему пользователю отправлять команды и получать ответы асинхронно. В реальном приложении эти действия довольно несвязаны (какое-то действие отправит команду, и ответы будут выполняться независимо).
Однако в моих тестах мне нужно отправить команду, а затем дождаться ответа (первого) перед продолжением теста.
Ответы публикуются с использованием RX, и моя первая попытка кода была примерно такой:
service.SendCommand("BLAH");
await service.Responses.FirstAsync();
Проблема с этим заключается в том, что FirstAsync
будет работать, только если ответ поступит после этого ожидания, который уже был атакован. Если служба работает очень быстро, тест будет висеть на await
.
Моя следующая попытка исправить это состояла в том, чтобы вызвать FirstAsync()
перед отправкой команды, чтобы она имела результат, даже если она прибыла до ожиданий:
var firstResponse = service.Responses.FirstAsync();
service.SendCommand("BLAH");
await firstResponse;
Однако это все равно не срабатывает аналогичным образом. Похоже, что только когда await
попадает (GetAwaiter
), который он начинает слушать; поэтому существует то же самое условие гонки.
Если я изменю тему Subject на ReplaySubject
с буфером (или таймером), тогда я могу "обходить" это; однако в моих производственных классах нет смысла делать это; это будет только для тестирования.
Какой "правильный" способ сделать это в RX? Как я могу настроить то, что получит первое событие в потоке таким образом, чтобы не вводить условие гонки?
Вот небольшой тест, который иллюстрирует проблему в однопоточном режиме. Этот тест будет висеть неопределенно:
[Fact]
public async Task MyTest()
{
var x = new Subject<bool>();
// Subscribe to the first bool (but don't await it yet)
var firstBool = x.FirstAsync();
// Send the first bool
x.OnNext(true);
// Await the task that receives the first bool
var b = await firstBool; // <-- hangs here; presumably because firstBool didn't start monitoring until GetAwaiter was called?
Assert.Equal(true, b);
}
Я даже попытался вызвать Replay() в своем тесте, считая, что он будет буферизовать результаты; но это ничего не меняет:
[Fact]
public async Task MyTest()
{
var x = new Subject<bool>();
var firstBool = x.Replay();
// Send the first bool
x.OnNext(true);
// Await the task that receives the first bool
var b = await firstBool.FirstAsync(); // <-- Still hangs here
Assert.Equal(true, b);
}
Ответы
Ответ 1
Вы можете сделать это с помощью AsyncSubject
[Fact]
public async Task MyTest()
{
var x = new Subject<bool>();
var firstBool = x.FirstAsync().PublishLast(); // PublishLast wraps an AsyncSubject
firstBool.Connect();
// Send the first bool
x.OnNext(true);
// Await the task that receives the first bool
var b = await firstBool;
Assert.Equal(true, b);
}
AsyncSubject
в основном кэширует последнее полученное значение до вызова OnComplete
, а затем повторяет его.
Ответ 2
Отличный вопрос Дэнни. Это беспокоит много новых людей для Rx.
У FlagBug есть приемлемый ответ выше, но было бы еще проще добавить только одну строку
var firstBool = x.Replay();
firstBool.Connect(); //Add this line, else your IConnectableObservable will never connect!
Этот стиль тестирования в порядке. Но есть и другой способ, который по моему опыту - это то, к чему люди переезжают, когда они используют Rx немного дольше. Я предлагаю вам просто перейти прямо к этой версии! Но давайте туда медленно...
(пожалуйста, извините переключатель обратно в NUnit, так как у меня нет бегуна xUnit на этом ПК)
Здесь мы просто добавляем значения к List<T>
по мере их создания. Затем мы можем просто проверить содержимое списка в наших утверждениях:
[Test]
public void MyTest_with_List()
{
var messages = new List<bool>();
var x = new Subject<bool>();
x.Subscribe(messages.Add);
// Send the first bool
x.OnNext(true);
Assert.AreEqual(true, messages.Single());
}
Для этих супер простых тестов это нормально, но мы пропускаем некоторую верность при завершении последовательности, т.е. завершили ли это или не сделали ошибку?
Мы можем расширить этот стиль тестирования с помощью инструментов тестирования Rx (Rx-testing Nuget). В этом тесте мы используем MockObserver
/ITestableObserver<T>
, который мы (досадно) получаем из экземпляра TestScheduler
. Примечание. Я сделал расширение test/class extend ReactiveTest
[TestCase(true)]
[TestCase(false)]
public void MyTest_with_TestObservers(bool expected)
{
var observer = new TestScheduler().CreateObserver<bool>();
var x = new Subject<bool>();
x.Subscribe(observer);
x.OnNext(expected);
observer.Messages.AssertEqual(
OnNext(0, expected));
}
Это может показаться небольшим улучшением или даже, возможно, шагом назад с необходимостью создания планировщиков тестов и указанием ожидаемого времени, которое мы видим в сообщениях. Однако, как только вы начинаете вводить более сложные Rx-тесты, это становится очень ценным.
Вы можете продолжить тестирование, чтобы даже генерировать исходную последовательность вверх и указать, когда значения будут воспроизводиться в виртуальном времени. Здесь мы отбрасываем использование объекта и указываем, что в 1000ticks мы опубликуем значение (expected
). В этом утверждении мы снова проверяем значение, а также время получения значения. Поскольку мы сейчас представляем виртуальное время, нам также нужно сказать, когда мы хотим, чтобы время продвигалось вперед. Мы делаем это здесь, вызывая testScheduler.Start();
[TestCase(true)]
[TestCase(false)]
public void MyTest_with_TestObservables(bool expected)
{
var testScheduler = new TestScheduler();
var observer = testScheduler.CreateObserver<bool>();
var source = testScheduler.CreateColdObservable(
OnNext(1000, expected));
source.Subscribe(observer);
testScheduler.Start();
observer.Messages.AssertEqual(
OnNext(1000, expected));
}
Я написал больше о тестировании Rx на здесь
Ответ 3
У нас была та же проблема, что и у вас, и мы решили ее, превратив Наблюдаемое в Задачу. Это наиболее разумный способ, который я считаю, что при использовании Задачи вы наверняка не пропустите результат, если он завершен до того, как вы его ожидаете, и ваш код также будет ожидать результата, если задача еще не завершена.
var x = new Subject<bool>();
//Create a Task that will start running immediately to catch the first element
var myTask = firstBool.FirstAsync().ToTask();
// Send the first bool
x.OnNext(true);
//wait for the task to complete, or retrieve the result if it did complete already
var b = await myTask;
Assert.Equal(true, b);