Какова цель возврата IDisposable в интерфейс IObservable <T>?

Я просматриваю книгу Head First Design Patterns и прилагаю все усилия, чтобы преобразовать код с их Java на С#. После того, как в книге обсуждался шаблон наблюдателя, он упомянул, что Java имеет классы/интерфейсы, встроенные, как и .NET4. Поэтому я начал исследовать, как правильно использовать его, и я больше всего понял его, кроме метода Subscribe().

Если вы посмотрите на статью MSDN при попытке подписания IObserver, метод возвращает IDisposable. Зачем это необходимо? Почему бы просто не реализовать метод, который не отвечает на IObserver на основе аргумента метода? Я исследовал причину использования интерфейса IDisposable. Я также прочитал это, но не совсем понял разницу/то, что он пытался сказать мне:

Он возвращает ссылку на интерфейс IDisposable. Это позволяет наблюдателям отказаться от подписки (то есть прекратить получать уведомления) до того, как поставщик завершит их отправку и вызвал метод подписчика OnCompleted.

Ответы

Ответ 1

Информация, необходимая для отмены подписки, будет зависеть от того, как издатель событий управляет подписками. Подход, используемый для событий - переход к методу Remove, который ранее передал делегат методу Add, является своего рода работоспособным, но имеет некоторые существенные недостатки. Среди них:

  • Часто требуется, чтобы издатель событий выполнял линейный поиск, чтобы найти запись, содержащую информацию, связанную с подпиской. Если есть возможность события, имеющего много подписчиков, это может привести к неправильному поведению O (N ^ 2). Если подписчики хранятся в каком-либо связанном списке (либо связанных объектах, либо в слотах с индексированным массивом), а в запросе на отмену подписки хранится информация об отмене подписки, подписки и отписки могут обрабатываться в постоянное время. Кроме того, без подписки можно безопасно и легко обрабатывать блокировку без блокировки (используя один "CompareExchange" на том, что, скорее всего, будет беспроцентным слотом массива), которое можно безопасно выполнить в контексте "Finalize".
  • Если один делегат несколько раз подписывается на событие, где имеет место порядок обработки, а код пытается отменить первую подписку, последняя подписка будет отменена, а первая останется в силе.
  • Если подписанный делегат `D` подписан, делегат многоадресной рассылки` ABCD`, содержащий `A`,` B`, `C` и` D`, подписан, а затем `D` отменяется, а затем делегирует` DABC `будет оставаться подписанным в порядке, даже если код пытается отказаться от подписки` ABCD`. Обратите внимание, что можно избежать этой проблемы, если использовать `List` вместо` delegateType.Combine`, но остальные проблемы останутся.

Наличие метода подписки на события возвращает объект, который может использоваться для отмены подписки, позволяет избежать этих проблем. Тогда самый большой вопрос заключается в том, какой тип объекта следует использовать. Приходят в голову три варианта:

  • Делегат (возможно, без параметров, возвращающий `void`)
  • Некоторый интерфейс "ICancelSubscription" с единственным методом (возможно, без параметров, возвращающим "void" ), чтобы отменить подписку
  • `IDisposable`, интерфейс, который существует, имеет единственный метод без параметров и широко используется для целей, связанных с очисткой

Использование делегата было бы разумным выбором. Это может быть легко инкапсулировать любую информацию, необходимую для отмены подписки, без необходимости беспокоиться о том, какую форму может принять эта информация. Использование делегата повлечет за собой выделение по крайней мере одного дополнительного объекта кучи (для самого делегата) и, возможно, двух (второй - объект, содержащий информацию о запрете подписки). Использование IDisposable будет по существу тем же самым, что и при использовании делегата, за исключением того, что называть Dispose, а не Invoke; во многих случаях, однако, IDisposable будет иметь небольшое преимущество с точки зрения эффективности. Использование другого интерфейса также будет работоспособным, но на самом деле не будет иметь никакого преимущества перед использованием существующего IDisposable.

Ответ 2

Кажется, что ваш главный вопрос заключается в том, почему Microsoft выбирает:

interface IObservable<T>
{
    IDisposable Subscribe(IObserver<T> observer);
}

вместо

interface IObservable<T>
{
    void Subscribe(IObserver<T> observer);
    void Unsubscribe(IObserver<T> observer);
}

Пока я не участвую в проектных встречах или рассуждениях, которые выходят из них, я могу предположить некоторые причины, почему этот выбор был сделан.

Лучшей причиной, по которой я могу думать о выборе первого формата, является то, что он позволяет отслеживать только IDisposable, возвращенный из Subscription, а не наблюдаемый и наблюдатель. Например, предположим, что у вас был такой код:

var obsSource = /* some observable source */
var subscription = obsSource.Where(x => x != null).Subscribe(Console.WriteLine);
// stuff...
subscription.Dispose();

В этой ситуации мне никогда не нужно ссылаться на подписанную на наблюдаемую (возвращенную из Where) и явно не создавать наблюдателя (через метод расширения Subscribe). Если был выбран второй вариант, вам нужно будет сделать это:

var obsSource = /* some observable source */
var filtered = obsSource.Where(x => x != null);
var observer = Observer.Create(Console.WriteLine);
filtered.Subscribe(observer);
// stuff...
filtered.Unsubscribe(observer);

На первый взгляд это не выглядит слишком по-разному, но, как я уже обсуждал в другом вопросе, промежуточные наблюдаемые данные не должны храниться после того, как подписка будет сделана. Если бы был сделан второй подход, вам все равно нужно было бы создавать и удерживать наблюдателей для каждого шага в цепочке, и вам также пришлось бы держать ссылки на промежуточные наблюдаемые.

В версии 1 двойственность с IEnumerable легче увидеть, чем метод 2. Возможно, это была часть первоначального дизайна, но это, безусловно, менее важная причина для долгого пути.

Ответ 3

Пара методов Subscribe и Unsubscribe будет не-композиционной. Каждому оператору необходимо будет хранить словарь наблюдателей, которые были переданы для Подписки, сопоставляя их с каждым экземпляром наблюдателя, который был передан в зависимые наблюдаемые последовательности (переданные оператору).

Например, рассмотрим возможность записи оператора Merge для двух источников. Сегодня это выглядит примерно так (textarea compiled):

static IObservable<T> Merge<T>(IObservable<T> xs, IObservable<T> ys)
{
    return Observable.Create<T>(observer =>
    {
        var n = 2;

        var mergeObserver = Observer.Create<T>(
            observer.OnNext,
            observer.OnError,
            () =>
            {
                // protected by the gate, see use of Synchronize below
                if (--n == 0)
                    observer.OnCompleted();
            }
        );

        var gate = new object();

        return new CompositeDisposable(
            xs.Synchronize(gate).Subscribe(mergeObserver),
            ys.Synchronize(gate).Subscribe(mergeObserver)
        );
    });
}

Как вы можете видеть, состав последовательностей также приводит к составу объектов IDisposable, возвращаемых из вызовов Subscribe. Обратите внимание, что в Observable есть много вещей. Создайте, что автоматически удаляет возвращаемый IDisposable при отправке терминального сообщения данному наблюдателю. В этом случае вызовы observer.OnError и observer.OnCompleted заботятся об утилизации обеих подписей в CompositeDisposable. (Но это совсем другой предмет, чтобы говорить о каком-то времени.)

Нижеприведенный код гипотетический, предполагая существование подписей /Unsubscribe пар на IObservable (следовательно, с помощью метода Create factory, который имеет два действия):

static IObservable<T> Merge<T>(IObservable<T> xs, IObservable<T> ys)
{
    var map = new Dictionary<IObserver<T>, IObserver<T>>();

    return Observable.Create<T>(
        subscribe: observer =>
        {
            var gate = new object();
            var n = 2;

            var mergeObserver = Observer.Create<T>(
                x =>
                {
                    lock (gate)
                        observer.OnNext(x);
                },
                ex =>
                {
                    lock (gate)
                        observer.OnError(ex);
                },
                () =>
                {
                    lock (gate)
                        if (--n == 0)
                            observer.OnCompleted();
                }
            );

            //
            // Using .Synchronize(gate) would be a mess, because then we need to
            // keep the  two synchronized sequences around as well, such that we
            // can call Unsubscribe on those. So, we're "better off" inlining the
            // locking code in the observer.
            //
            // (Or: how composition goes down the drain!)
            //
            xs.Subscribe(mergeObserver);
            ys.Subscribe(mergeObserver);

            lock (map)
                map[observer] = mergeObserver;
        },
        unsubscribe: observer =>
        {
            var mergeObserver = default(IObserver<T>);
            lock (map)
                map.TryGetValue(observer, out mergeObserver);

            if (mergeObserver != null)
            {
                xs.Unsubscribe(mergeObserver);
                ys.Unsubscribe(mergeObserver);
            }
        }
    );
}

Остерегайтесь, это гипотетично; Я даже не думал о более крайних случаях, ни о том, как этот Create будет работать, чтобы очистить себя после вызова OnError или OnCompleted. Кроме того, с помощью Merge в качестве примера нам повезло, что у нас нет других ресурсов, которые нужно заботиться во время "Отказаться от подписки" (например, заданий планировщика).

Надеюсь, что это поможет,

-Bart (команда Rx)

Ответ 4

Вызов метода отмены подписки означает, что вам нужно пройти в том же экземпляре, который подписан. Вы должны держать ссылку. Если у вас много наблюдаемых различного типа, вам нужно много разных ссылок.

Очистка кода становится беспорядочной с помощью метода отмены подписки.

Однако с IDisposable вам нужна только одна структура для хранения всех ссылок - List<IDisposable> (или вы можете использовать Rx CompositeDisposable).

Теперь ваш очищающий код может быть очень аккуратным и аккуратным.

Я пошел дальше и создал экземпляры IDisposable для всего кода очистки - не только из подписчиков Rx. Это облегчает жизнь.

Ответ 5

В дополнение к другим причинам, указанным IDisposable, вы также можете использовать блок using.

using (var subscription = source.Subscribe(observer))
{
    // ...
}

Ответ 6

Это позволяет очень легко гарантировать, что подписка на наблюдателя прекращается, когда сам наблюдатель Disposed. Считайте, что Subscribe обычно вызывается классом, который реализует IObservable:

class MyObserver<Foo> : IObserver<Foo>, IDisposable {

    private IDisposable _subscription;

    public MyObserver(IObservable<T> eventSource) {
        _subscription = eventSource.Subscribe(this);
    }

    // implementation of IObservable members here

    public void Dispose() {
        _subscription.Dispose();
    }
}

Теперь, когда мой экземпляр MyObserver будет удален, подписка будет автоматически удаляться вместе с ней:

public partial class MyGUIThing : Form {
    private MyObservable<Foo> _observer = new MyObservable<Foo>(someEventSource);
    // whatever else
}

Когда экземпляр этой формы создается, начинается подписка. Когда форма закрыта и завершена _observer, подписка также будет автоматически удалена.