Ответ 1
Там много вводящей в заблуждение информации о SubscribeOn
и ObserveOn
.
Резюме
-
SubscribeOn
перехватывает вызовы для единственного методаIObservable<T>
, которыйSubscribe
, и вызываетDispose
в возвращаемом дескриптореIDisposable
наSubscribe
. -
ObserveOn
перехватывает вызовы методамIObserver<T>
, которыеOnNext
,OnCompleted
иOnError
. - Оба метода вызывают соответствующие вызовы в указанном планировщике.
Анализ и демонстрации
Утверждение
ObserveOn устанавливает, где код в обработчике Subscribe выполняется:
более запутанным, чем полезным. То, что вы называете "обработчиком подписки", действительно является обработчиком OnNext
. Помните, что метод Subscribe
IObservable
принимает IObserver
, который имеет методы OnNext
, OnCompleted
и OnError
, но это методы расширения, которые обеспечивают удобные перегрузки, которые принимают lambdas, и создают IObserver
для вас.
Позвольте мне учесть этот термин; Я думаю, что обработчик "Подписаться" является кодом в наблюдаемом, который вызывается, когда вызывается Subscribe
. Таким образом, приведенное выше описание более близко напоминает цель SubscribeOn
.
SubscribeOn
SubscribeOn
заставляет метод Subscribe
наблюдаемого выполняться асинхронно в указанном планировщике или контексте. Вы используете его, когда вы не хотите вызывать метод Subscribe
на наблюдаемом из любого потока, на котором вы работаете, - как правило, потому что он может быть длительным, и вы не хотите блокировать вызывающий поток.
Когда вы вызываете Subscribe
, вы вызываете наблюдаемое, которое может быть частью длинной цепочки наблюдаемых. Это только наблюдаемое, что SubscribeOn
применяется к тому, что оно воздействует. Теперь может случиться так, что все наблюдаемые в цепочке будут подписаны сразу и в том же потоке, но это не обязательно. Подумайте, например, о Concat
, который подписывается только на каждый последовательный поток после завершения предыдущего потока, и обычно это будет происходить в любом потоке, который предшествует поток, называемый OnCompleted
.
Итак SubscribeOn
находится между вашим вызовом до Subscribe
и наблюдаемым, на который вы подписаны, перехватывая вызов и делая его асинхронным.
Это также влияет на удаление подписки. Subscribe
возвращает дескриптор IDisposable
, который используется для отмены подписки. SubscribeOn
гарантирует, что вызовы Dispose
запланированы в поставляемом планировщике.
Общей точкой смятения при попытке понять, что делает SubscribeOn
, является то, что обработчик Subscribe
наблюдаемого вполне может вызвать OnNext
, OnCompleted
или OnError
в этом же потоке. Однако его цель не влияет на эти вызовы. Это не редкость для завершения потока перед возвратом метода Subscribe
. Observable.Return
делает это, например. Давайте посмотрим.
Если вы используете метод Spy, который я написал, и запустите следующий код:
Console.WriteLine("Calling from Thread: " + Thread.CurrentThread.ManagedThreadId);
var source = Observable.Return(1).Spy("Return");
source.Subscribe();
Console.WriteLine("Subscribe returned");
Вы получаете этот вывод (идентификатор потока может варьироваться, конечно):
Calling from Thread: 1
Return: Observable obtained on Thread: 1
Return: Subscribed to on Thread: 1
Return: OnNext(1) on Thread: 1
Return: OnCompleted() on Thread: 1
Return: Subscription completed.
Subscribe returned
Вы можете видеть, что весь обработчик подписки работал в одном потоке и заканчивался перед возвратом.
Используйте SubscribeOn
для запуска асинхронно. Мы будем шпионить как на наблюдаемом Return
, так и на SubscribeOn
:
Console.WriteLine("Calling from Thread: " + Thread.CurrentThread.ManagedThreadId);
var source = Observable.Return(1).Spy("Return");
source.SubscribeOn(Scheduler.Default).Spy("SubscribeOn").Subscribe();
Console.WriteLine("Subscribe returned");
Эти выходы (добавленные мной номера строк):
01 Calling from Thread: 1
02 Return: Observable obtained on Thread: 1
03 SubscribeOn: Observable obtained on Thread: 1
04 SubscribeOn: Subscribed to on Thread: 1
05 SubscribeOn: Subscription completed.
06 Subscribe returned
07 Return: Subscribed to on Thread: 2
08 Return: OnNext(1) on Thread: 2
09 SubscribeOn: OnNext(1) on Thread: 2
10 Return: OnCompleted() on Thread: 2
11 SubscribeOn: OnCompleted() on Thread: 2
12 Return: Subscription completed.
01 - Основной метод работает в потоке 1.
02 - наблюдаемый Return
оценивается на вызывающем потоке. Мы просто получаем IObservable
здесь, пока ничего не подписывается.
03 - наблюдаемый SubscribeOn
оценивается в вызывающем потоке.
04 - Теперь, наконец, мы называем метод Subscribe
SubscribeOn
.
05 - Метод Subscribe
завершается асинхронно...
06 -... и поток 1 возвращается к основному методу. Это действие SubscribeOn в действии!
07 - Между тем SubscribeOn
запланировал вызов планировщика по умолчанию на Return
. Здесь он получен в потоке 2.
08 - И как Return
делает, он вызывает OnNext
в потоке Subscribe
...
09 - и SubscribeOn
- это только проход.
10,11 - То же самое для OnCompleted
12 - И последнее завершено обработчик подписки Return
.
Надеюсь, это очистит цель и эффект SubscribeOn
!
ObserveOn
Если вы думаете о SubscribeOn
как перехватчик для метода Subscribe
, который передает вызов другому тегу, тогда ObserveOn
выполняет одно и то же задание, но для OnNext
, OnCompleted
и OnError
.
Вспомним наш оригинальный пример:
Console.WriteLine("Calling from Thread: " + Thread.CurrentThread.ManagedThreadId);
var source = Observable.Return(1).Spy("Return");
source.Subscribe();
Console.WriteLine("Subscribe returned");
Который дал этот результат:
Calling from Thread: 1
Return: Observable obtained on Thread: 1
Return: Subscribed to on Thread: 1
Return: OnNext(1) on Thread: 1
Return: OnCompleted() on Thread: 1
Return: Subscription completed.
Subscribe returned
Теперь измените это, чтобы использовать ObserveOn
:
Console.WriteLine("Calling from Thread: " + Thread.CurrentThread.ManagedThreadId);
var source = Observable.Return(1).Spy("Return");
source.ObserveOn(Scheduler.Default).Spy("ObserveOn").Subscribe();
Console.WriteLine("Subscribe returned");
Мы получаем следующий вывод:
01 Calling from Thread: 1
02 Return: Observable obtained on Thread: 1
03 ObserveOn: Observable obtained on Thread: 1
04 ObserveOn: Subscribed to on Thread: 1
05 Return: Subscribed to on Thread: 1
06 Return: OnNext(1) on Thread: 1
07 ObserveOn: OnNext(1) on Thread: 2
08 Return: OnCompleted() on Thread: 1
09 Return: Subscription completed.
10 ObserveOn: Subscription completed.
11 Subscribe returned
12 ObserveOn: OnCompleted() on Thread: 2
01 - Основной метод работает в Thread 1.
02 - Как и ранее, наблюдаемый Return
оценивается на вызывающем потоке. Мы просто получаем IObservable
здесь, пока ничего не подписывается.
03 - Наблюдаемый ObserveOn
также оценивается в вызывающем потоке.
04 - Теперь мы снова подписываемся на вызывающий поток, сначала на ObserveOn
наблюдаемый...
05 -... который затем передает вызов на Return
наблюдаемый.
06 - Теперь Return
вызывает OnNext
в своем обработчике Subscribe
.
07 - Вот эффект ObserveOn
. Мы видим, что OnNext
запланирован асинхронно на Thread 2.
08 - Между тем Return
вызывает OnCompleted
в Thread 1...
09 - И обработчик подписки Return
завершен...
10 - и тогда делает обработчик подписки ObserveOn
...
11 - поэтому управление возвращается основному методу
12 - Между тем, ObserveOn
отправил Return
OnCompleted
вызов this в Thread 2. Это могло произойти в любое время в течение 09-11, потому что оно выполняется асинхронно. Так получилось, что он наконец-то позвонил.
Каковы типичные варианты использования?
Вы чаще всего будете видеть SubscribeOn
, который используется в графическом интерфейсе, когда вам нужно Subscribe
на длительный наблюдаемый и хотите как можно скорее выйти из потока диспетчера - возможно, потому, что вы знаете, что это одна из тех наблюдаемых, которые все это работает в обработчике подписки. Примените его в конце наблюдаемой цепочки, потому что это первый наблюдаемый, который вызывается при подписании.
Вы чаще всего увидите ObserveOn
, используемый в графическом интерфейсе, если вы хотите, чтобы вызовы OnNext
, OnCompleted
и OnError
были перенаправлены обратно в поток диспетчера. Примените его в конце наблюдаемой цепи для перехода назад как можно позже.
Надеюсь, вы увидите, что ответ на ваш вопрос заключается в том, что ObserveOnDispatcher
не будет иметь никакого значения для потоков, которые выполняются Where
и SelectMany
- все зависит от того, из какого потока поток их вызывает! обработчик потоковой подписки будет вызываться в вызывающем потоке, но невозможно сказать, где Where
и SelectMany
будут работать, не зная, как реализуется stream
.
Наблюдаемые со временем жизни, которые переживают вызов Subscribe
До сих пор мы смотрели исключительно на Observable.Return
. Return
завершает свой поток в обработчике Subscribe
. Это не атипично, но для потоков, которые, как правило, слишком много, чтобы пережить обработчик Subscribe
. Посмотрите Observable.Timer
, например:
Console.WriteLine("Calling from Thread: " + Thread.CurrentThread.ManagedThreadId);
var source = Observable.Timer(TimeSpan.FromSeconds(1)).Spy("Timer");
source.Subscribe();
Console.WriteLine("Subscribe returned");
Это возвращает следующее:
Calling from Thread: 1
Timer: Observable obtained on Thread: 1
Timer: Subscribed to on Thread: 1
Timer: Subscription completed.
Subscribe returned
Timer: OnNext(0) on Thread: 2
Timer: OnCompleted() on Thread: 2
Вы можете четко видеть, что подписка завершена, а затем OnNext
и OnCompleted
вызывается позже в другом потоке.
Обратите внимание, что никакая комбинация SubscribeOn
или ObserveOn
не будет иметь никакого эффекта, по которому нить или планировщик Timer
не хочет вызывать OnNext
и OnCompleted
on.
Конечно, вы можете использовать SubscribeOn
для определения потока Subscribe
:
Console.WriteLine("Calling from Thread: " + Thread.CurrentThread.ManagedThreadId);
var source = Observable.Timer(TimeSpan.FromSeconds(1)).Spy("Timer");
source.SubscribeOn(NewThreadScheduler.Default).Spy("SubscribeOn").Subscribe();
Console.WriteLine("Subscribe returned");
(я намеренно меняю на NewThreadScheduler
здесь, чтобы предотвратить путаницу в случае Timer
, чтобы получить тот же поток потока потока, что и SubscribeOn
)
Дарение:
Calling from Thread: 1
Timer: Observable obtained on Thread: 1
SubscribeOn: Observable obtained on Thread: 1
SubscribeOn: Subscribed to on Thread: 1
SubscribeOn: Subscription completed.
Subscribe returned
Timer: Subscribed to on Thread: 2
Timer: Subscription completed.
Timer: OnNext(0) on Thread: 3
SubscribeOn: OnNext(0) on Thread: 3
Timer: OnCompleted() on Thread: 3
SubscribeOn: OnCompleted() on Thread: 3
Здесь вы можете четко видеть основной поток в потоке (1), возвращающийся после его вызовов Subscribe
, но подписка Timer
получает свой собственный поток (2), но вызовы OnNext
и OnCompleted
работают на нить (3).
Теперь для ObserveOn
, измените код на (для тех, которые следуют в коде, используйте пакет nuget rx-wpf):
var dispatcher = Dispatcher.CurrentDispatcher;
Console.WriteLine("Calling from Thread: " + Thread.CurrentThread.ManagedThreadId);
var source = Observable.Timer(TimeSpan.FromSeconds(1)).Spy("Timer");
source.ObserveOnDispatcher().Spy("ObserveOn").Subscribe();
Console.WriteLine("Subscribe returned");
Этот код немного отличается. Первая строка гарантирует, что у нас есть диспетчер, а также ObserveOnDispatcher
- это похоже на ObserveOn
, за исключением того, что он указывает, что мы должны использовать DispatcherScheduler
любого потока ObserveOnDispatcher
.
Этот код дает следующий результат:
Calling from Thread: 1
Timer: Observable obtained on Thread: 1
ObserveOn: Observable obtained on Thread: 1
ObserveOn: Subscribed to on Thread: 1
Timer: Subscribed to on Thread: 1
Timer: Subscription completed.
ObserveOn: Subscription completed.
Subscribe returned
Timer: OnNext(0) on Thread: 2
ObserveOn: OnNext(0) on Thread: 1
Timer: OnCompleted() on Thread: 2
ObserveOn: OnCompleted() on Thread: 1
Обратите внимание, что диспетчер (и основной поток) - это поток 1. Timer
по-прежнему вызывает OnNext
и OnCompleted
в потоке по его выбору (2), но ObserveOnDispatcher
выполняет сортировку вызовов на поток диспетчера, резьба (1).
Также обратите внимание, что если бы мы заблокировали поток диспетчера (например, Thread.Sleep
), вы увидите, что блок ObserveOnDispatcher
будет блокироваться (этот код лучше всего работает в основном методе LINQPad):
var dispatcher = Dispatcher.CurrentDispatcher;
Console.WriteLine("Calling from Thread: " + Thread.CurrentThread.ManagedThreadId);
var source = Observable.Timer(TimeSpan.FromSeconds(1)).Spy("Timer");
source.ObserveOnDispatcher().Spy("ObserveOn").Subscribe();
Console.WriteLine("Subscribe returned");
Console.WriteLine("Blocking the dispatcher");
Thread.Sleep(2000);
Console.WriteLine("Unblocked");
И вы увидите вывод следующим образом:
Calling from Thread: 1
Timer: Observable obtained on Thread: 1
ObserveOn: Observable obtained on Thread: 1
ObserveOn: Subscribed to on Thread: 1
Timer: Subscribed to on Thread: 1
Timer: Subscription completed.
ObserveOn: Subscription completed.
Subscribe returned
Blocking the dispatcher
Timer: OnNext(0) on Thread: 2
Timer: OnCompleted() on Thread: 2
Unblocked
ObserveOn: OnNext(0) on Thread: 1
ObserveOn: OnCompleted() on Thread: 1
С помощью вызовов через ObserveOnDispatcher
можно выйти только после запуска Sleep
.
Ключевые моменты
Полезно иметь в виду, что Reactive Extensions - это, по сути, библиотека с бесплатной резьбой, и пытается быть настолько ленивым, насколько возможно, о том, в какой поток она работает, - вы должны сознательно вмешиваться в ObserveOn
, SubscribeOn
и передавать конкретные планировщики для операторов, которые принимают их, чтобы изменить это.
Ни один потребитель наблюдаемого не может сделать, чтобы контролировать, что он делает внутри - ObserveOn
и SubscribeOn
являются декораторами, которые обертывают площадь наблюдателей и наблюдаемых для маршальных вызовов через нитки. Надеюсь, эти примеры сделали это понятным.