Ответ 1
Позвольте мне свести ваш вопрос к его сути:
Есть ли способ, используя
TestScheduler
, выполнить реактивный конвейер и дождаться его завершения, даже если он содержит асинхронные вызовы?
Я должен предупредить вас впереди, здесь нет быстрого и легкого ответа, нет удобного "трюка", который можно развернуть.
Асинхронные вызовы и планировщики
Чтобы ответить на этот вопрос, я думаю, нам нужно прояснить некоторые моменты. Термин "асинхронный вызов" в вышеуказанном вопросе, по-видимому, используется специально для обозначения методов с помощью подписи Task
или Task<T>
, то есть методов, которые используют параллельную библиотеку задач (TPL) для асинхронного запуска.
Это важно отметить, поскольку Reactive Extensions (Rx) использует другой подход к обработке асинхронных операций.
В Rx введение concurrency управляется через планировщик, тип, реализующий интерфейс IScheduler
. Любая операция, которая вводит concurrency, должна сделать доступным параметр планировщика, чтобы вызывающий мог решить соответствующий планировщик. Основная библиотека рабски придерживается этого принципа. Так, например, Delay
позволяет спецификацию планировщика, но Where
не работает.
Как вы можете видеть из источника, IScheduler
предоставляет несколько перегрузок Schedule
. Операции, требующие использования concurrency, используют их для планирования выполнения работы. Именно то, как выполняется эта работа, полностью откладывается на планировщик. Это сила абстракции планировщика.
Операции Rx, вводящие concurrency, обычно обеспечивают перегрузки, которые позволяют опустить планировщик, и в этом случае выбрать разумное значение по умолчанию. Это важно отметить, поскольку, если вы хотите, чтобы ваш код мог быть проверен с помощью TestScheduler
, вы должны использовать TestScheduler
для всех операций, которые вводят concurrency. Мошеннический метод, который не позволяет этого, может сорвать ваши усилия по тестированию.
TPL Scheduling Abstraction
У TPL есть своя абстракция для обработки concurrency: TaskScheduler
. Идея очень похожа. Вы можете прочитать об этом здесь..
Существуют два очень важных различия между двумя абстракциями:
- Планировщики Rx имеют представление первого класса собственного представления о времени - свойство
Now
. Планировщики TPL этого не делают. - Использование пользовательских планировщиков в TPL гораздо менее распространено, и нет эквивалентной передовой практики предоставления перегрузок для предоставления конкретного
TaskSchedulers
методу, представляющему concurrency (возвратTask
илиTask<T>
), Подавляющее большинство методов возвратаTask
предполагают использование стандартногоTaskScheduler
и не дают вам выбора о том, где выполняется работа.
Мотивация для TestScheduler
Мотивация использования TestScheduler
обычно двояка:
- Чтобы устранить необходимость "ждать" операций, ускоряя время.
- Чтобы проверить, что события произошли в ожидаемые моменты времени.
То, как это работает, полностью зависит от того, что планировщики имеют собственное понятие времени. Каждый раз, когда операция назначается с помощью IScheduler
, мы указываем, когда она должна выполняться - либо как можно скорее, либо в определенное время в будущем. Затем планировщик выполняет очередь для выполнения и выполняет его, когда достигнуто указанное время (в соответствии с самим планировщиком).
Когда вы вызываете Start
в TestScheduler
, он работает, опуская очередь всех операций со временем выполнения на уровне или до его текущего понятия Now
- и затем продвигая свои часы на следующее запланированное рабочее время и повторяя, пока его очередь не будет пуста.
Это позволяет использовать аккуратные трюки, например, возможность проверить, что операция никогда не приведет к событию! Если использовать в реальном времени это будет непростая задача, но с виртуальным временем это будет легко - как только очередь планировщика будет полностью пуста, тогда TestScheduler
сделает вывод, что никаких дальнейших событий не произойдет никогда, поскольку, если в очереди не осталось ничего, нет ничего, чтобы планировать дальнейшие задачи. На самом деле Start
возвращает именно эту точку. Для этого необходимо четко определить все параллельные операции, которые необходимо измерить, на TestScheduler
.
Пользовательский оператор, который небрежно делает свой собственный выбор планировщика, не позволяя этому выбору переопределяться, или операция, которая использует свою собственную форму concurrency без понятия времени (например, вызовы на основе TPL), затруднит, если не невозможно, управлять выполнением с помощью TestScheduler
.
Если у вас асинхронная операция выполняется другими способами, разумное использование методов AdvanceTo
и AdvanceBy
TestScheduler
может позволить вам координировать с этим внешним источником concurrency, но степень, в которой это достижимо зависит от контроля, предоставляемого этим иностранным источником.
В случае с TPL вы знаете, когда задача выполнена, что позволяет использовать ожидания и таймауты в тестах, такие же уродливые, как это может быть. Благодаря использованию TaskCompleteSources
(TCS) вы можете имитировать задачи и использовать AdvanceTo
для достижения конкретных точек и завершения TCS, но здесь нет простого подхода. Часто вам просто нужно прибегать к уродливым ожиданиям и тайм-аутам, потому что у вас нет достаточного контроля над иностранными concurrency.
Rx, как правило, имеет свободную резьбу и пытается избежать введения concurrency, где это возможно. И наоборот, вполне возможно, что для разных операций в цепочке вызовов Rx потребуются разные типы абстракции планировщика. Не всегда можно моделировать цепочку вызовов с помощью одного планировщика тестов. Конечно, у меня была причина использовать несколько TestSchedulers
для моделирования некоторых сложных сценариев - например, целям, использующим DispatcherScheduler
и TaskScheduler
, иногда требуется сложная координация, что означает, что вы не можете просто сериализовать свои операции на один TestScheduler
.
В некоторых проектах, над которыми я работал, было поручено использовать Rx для всех concurrency специально, чтобы избежать этих проблем. Это не всегда возможно, и даже в этих случаях использование TPL обычно неизбежно.
Одна конкретная точка боли
Одна конкретная точка боли Rx, которая оставляет много тестеров, царапающих их головы, заключается в том, что семейство преобразований TPL → Rx вводит concurrency. например ToObservable
, SelectMany
перегрузка, принимающая Task<T>
и т.д., не обеспечивает перегрузки планировщиком и коварно заставляет вас отключить поток TestScheduler
, даже если насмехается с TCS. При всей боли, которая возникает только при тестировании, я считаю это ошибкой. Здесь вы можете прочитать все об этом , и вы найдете предложенное Dave Sexton исправление, которое обеспечивает перегрузку для указания планировщика и рассматривается для включения. Возможно, вы захотите изучить этот запрос на растяжение.
Потенциальное обходное решение
Если вы можете редактировать свой код, чтобы использовать его, может оказаться полезным следующий вспомогательный метод. Он преобразует задачу в наблюдаемую, которая будет работать на TestScheduler и завершена в правильное виртуальное время.
Он планирует работу над TestScheduler, которая отвечает за сбор результата задачи - в виртуальное время, когда мы заявляем, что задача должна завершиться. Сама работа блокируется до тех пор, пока не будет получен результат задачи, что позволяет выполнять задачу TPL в течение долгого времени или до тех пор, пока не пройдет реальная сумма указанного времени, в этом случае будет выброшен a TimeoutException
.
Эффект блокировки работы означает, что TestScheduler
не будет продвигать свое виртуальное время до ожидаемого времени виртуального завершения задачи до тех пор, пока задача не завершится. Таким образом, остальная часть Rx-цепи может работать в режиме полной скорости в реальном времени, и мы ждем только задачи TPL, приостанавливая остальную цепочку в виртуальном времени завершения задачи, пока это происходит.
К сожалению, другие одновременные операции Rx, запланированные для запуска между виртуальным временем запуска операции на основе задачи и заявленным конечным виртуальным временем задачи, не блокируются, и их время виртуального завершения не будет затронуто.
Таким образом, установите duration
на длину виртуального времени, которое вы хотите, чтобы задача выглядела как взятая. Затем результат будет собран в любое время, когда виртуальное время запускается, плюс указанная продолжительность.
Установите timeout
на фактическое время, когда вы разрешите выполнение задачи. Если это занимает больше времени, генерируется исключение таймаута:
public static IObservable<T> ToTestScheduledObseravble<T>(
this Task<T> task,
TestScheduler scheduler,
TimeSpan duration,
TimeSpan? timeout = null)
{
timeout = timeout ?? TimeSpan.FromSeconds(100);
var subject = Subject.Synchronize(new AsyncSubject<T>(), scheduler);
scheduler.Schedule<Task<T>>(task, duration,
(s, t) => {
if (!task.Wait(timeout.Value))
{
subject.OnError(
new TimeoutException(
"Task duration too long"));
}
else
{
switch (task.Status)
{
case TaskStatus.RanToCompletion:
subject.OnNext(task.Result);
subject.OnCompleted();
break;
case TaskStatus.Faulted:
subject.OnError(task.Exception.InnerException);
break;
case TaskStatus.Canceled:
subject.OnError(new TaskCanceledException(task));
break;
}
}
return Disposable.Empty;
});
return subject.AsObservable();
}
Использование в вашем коде будет таким, и ваше утверждение будет проходить:
Observable
.Return(1)
.Select(i => Whatever().ToTestScheduledObseravble(
scheduler, TimeSpan.FromSeconds(1)))
.Concat()
.Subscribe(_ => Interlocked.Increment(ref count));
Заключение
Таким образом, вы не упустили какой-либо удобный трюк. Вам нужно подумать о том, как работает Rx и как работает TPL, и решить, будет ли:
- Вы избегаете смешивания TPL и Rx
- Вы издеваетесь над интерфейсом между TPL и Rx (используя TCS или аналогичный), поэтому вы каждый раз проверяете
- Вы живете с уродливыми ожиданиями и тайм-аутами и вообще откажитесь от
TestScheduler
- Вы смешиваете уродливые ожидания и таймауты с помощью
TestScheduler
, чтобы внести некоторые изменения в свои тесты.