Реактивные расширения кажутся очень медленными - я что-то делаю неправильно?
Я оцениваю Rx для проекта торговой платформы, который должен обрабатывать тысячи сообщений в секунду. Существующая платформа имеет сложную систему маршрутизации событий (многоадресные делегаты), которая отвечает на эти сообщения и выполняет много последующей обработки.
Я посмотрел на Reactive Extensions для очевидных преимуществ, но заметил, что он несколько медленнее, обычно в 100 раз медленнее.
Я создал unit test, чтобы продемонстрировать это, которое запускает простой шаг в 1 миллион раз, используя различные Rx-вкусы и прямое-делегированное делегирование "контроль".
Вот результаты:
Delegate - (1000000) - 00:00:00.0410000
Observable.Range() - (1000000) - 00:00:04.8760000
Subject.Subscribe() - NewThread - (1000000) - 00:00:02.7630000
Subject.Subscribe() - CurrentThread - (1000000) - 00:00:03.0280000
Subject.Subscribe() - Immediate - (1000000) - 00:00:03.0030000
Subject.Subscribe() - ThreadPool - (1000000) - 00:00:02.9800000
Subject.Subscribe() - Dispatcher - (1000000) - 00:00:03.0360000
Как вы можете видеть, все методы Rx ~ 100 раз медленнее, чем эквивалент делегата. Очевидно, Rx делает многое под обложками, которые будут использоваться на более сложном примере, но это просто невероятно медленно.
Является ли это нормальным или мои предположения о тестировании недействительными? Код Nunit для вышеописанного -
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Text;
using NUnit.Framework;
using System.Concurrency;
namespace RxTests
{
[TestFixture]
class ReactiveExtensionsBenchmark_Tests
{
private int counter = 0;
[Test]
public void ReactiveExtensionsPerformanceComparisons()
{
int iterations = 1000000;
Action<int> a = (i) => { counter++; };
DelegateSmokeTest(iterations, a);
ObservableRangeTest(iterations, a);
SubjectSubscribeTest(iterations, a, Scheduler.NewThread, "NewThread");
SubjectSubscribeTest(iterations, a, Scheduler.CurrentThread, "CurrentThread");
SubjectSubscribeTest(iterations, a, Scheduler.Immediate, "Immediate");
SubjectSubscribeTest(iterations, a, Scheduler.ThreadPool, "ThreadPool");
SubjectSubscribeTest(iterations, a, Scheduler.Dispatcher, "Dispatcher");
}
public void ObservableRangeTest(int iterations, Action<int> action)
{
counter = 0;
long start = DateTime.Now.Ticks;
Observable.Range(0, iterations).Subscribe(action);
OutputTestDuration("Observable.Range()", start);
}
public void SubjectSubscribeTest(int iterations, Action<int> action, IScheduler scheduler, string mode)
{
counter = 0;
var eventSubject = new Subject<int>();
var events = eventSubject.SubscribeOn(scheduler); //edited - thanks dtb
events.Subscribe(action);
long start = DateTime.Now.Ticks;
Enumerable.Range(0, iterations).ToList().ForEach
(
a => eventSubject.OnNext(1)
);
OutputTestDuration("Subject.Subscribe() - " + mode, start);
}
public void DelegateSmokeTest(int iterations, Action<int> action)
{
counter = 0;
long start = DateTime.Now.Ticks;
Enumerable.Range(0, iterations).ToList().ForEach
(
a => action(1)
);
OutputTestDuration("Delegate", start);
}
/// <summary>
/// Output helper
/// </summary>
/// <param name="test"></param>
/// <param name="duration"></param>
public void OutputTestDuration(string test, long duration)
{
Debug.WriteLine(string.Format("{0, -40} - ({1}) - {2}", test, counter, ElapsedDuration(duration)));
}
/// <summary>
/// Test timing helper
/// </summary>
/// <param name="elapsedTicks"></param>
/// <returns></returns>
public string ElapsedDuration(long elapsedTicks)
{
return new TimeSpan(DateTime.Now.Ticks - elapsedTicks).ToString();
}
}
}
Ответы
Ответ 1
Моя догадка заключается в том, что команда Rx фокусируется на создании функциональности в первую очередь и не заботится о оптимизации производительности.
Используйте профилировщик для определения узких мест и замены медленных классов Rx своими оптимизированными версиями.
Ниже приведены два примера.
Результаты:
Delegate - (1000000) - 00:00:00.0368748
Simple - NewThread - (1000000) - 00:00:00.0207676
Simple - CurrentThread - (1000000) - 00:00:00.0214599
Simple - Immediate - (1000000) - 00:00:00.0162026
Simple - ThreadPool - (1000000) - 00:00:00.0169848
FastSubject.Subscribe() - NewThread - (1000000) - 00:00:00.0588149
FastSubject.Subscribe() - CurrentThread - (1000000) - 00:00:00.0508842
FastSubject.Subscribe() - Immediate - (1000000) - 00:00:00.0513911
FastSubject.Subscribe() - ThreadPool - (1000000) - 00:00:00.0529137
Прежде всего, кажется, очень важно, как осуществляется наблюдение. Здесь наблюдаемый, который нельзя отменить, но он быстро:
private IObservable<int> CreateFastObservable(int iterations)
{
return Observable.Create<int>(observer =>
{
new Thread(_ =>
{
for (int i = 0; i < iterations; i++)
{
observer.OnNext(i);
}
observer.OnCompleted();
}).Start();
return () => { };
});
}
Тест:
public void SimpleObserveTest(int iterations, Action<int> action, IScheduler scheduler, string mode)
{
counter = 0;
var start = Stopwatch.StartNew();
var observable = CreateFastObservable(iterations);
observable.SubscribeOn(scheduler).Run(action);
OutputTestDuration("Simple - " + mode, start);
}
Субъекты добавляют много накладных расходов. Здесь тема, которая лишена большей части функциональности, ожидаемой от субъекта, но быстро:
class FastSubject<T> : ISubject<T>
{
private event Action onCompleted;
private event Action<Exception> onError;
private event Action<T> onNext;
public FastSubject()
{
onCompleted += () => { };
onError += error => { };
onNext += value => { };
}
public void OnCompleted()
{
this.onCompleted();
}
public void OnError(Exception error)
{
this.onError(error);
}
public void OnNext(T value)
{
this.onNext(value);
}
public IDisposable Subscribe(IObserver<T> observer)
{
this.onCompleted += observer.OnCompleted;
this.onError += observer.OnError;
this.onNext += observer.OnNext;
return Disposable.Create(() =>
{
this.onCompleted -= observer.OnCompleted;
this.onError -= observer.OnError;
this.onNext -= observer.OnNext;
});
}
}
Тест:
public void FastSubjectSubscribeTest(int iterations, Action<int> action, IScheduler scheduler, string mode)
{
counter = 0;
var start = Stopwatch.StartNew();
var observable = new ConnectableObservable<int>(CreateFastObservable(iterations), new FastSubject<int>()).RefCount();
observable.SubscribeOn(scheduler).Run(action);
OutputTestDuration("FastSubject.Subscribe() - " + mode, start);
}
Ответ 2
Обновление для Rx 2.0: я взял код из исходного сообщения с (почти) последней версией Linqpad beta 4.42.04 (ну там 06, но в любом случае):
![Rx Main assemblies]()
... и немного изменил его, чтобы использовать новый синтаксис планировщика Rx v2:
public void ReactiveExtensionsPerformanceComparisons()
{
int iterations = 1000000;
Action<int> a = (i) => { counter++; };
DelegateSmokeTest(iterations, a);
ObservableRangeTest(iterations, a);
SubjectSubscribeTest(iterations, a, NewThreadScheduler.Default, "NewThread");
SubjectSubscribeTest(iterations, a, CurrentThreadScheduler.Instance, "CurrentThread");
SubjectSubscribeTest(iterations, a, ImmediateScheduler.Instance, "Immediate");
SubjectSubscribeTest(iterations, a, ThreadPoolScheduler.Instance, "ThreadPool");
// I *think* this is the same as the ThreadPool scheduler in my case
SubjectSubscribeTest(iterations, a, DefaultScheduler.Instance, "Default");
// doesn't work, as LinqPad has no Dispatcher attched to the Gui thread, maybe there a workaround; the Instance property on it is obsolete
//SubjectSubscribeTest(iterations, a, DispatcherScheduler.Current, "ThreadPool");
}
Примечание: результаты сильно различаются, в редких случаях Threadpool превосходит newThread, но в большинстве случаев у NewThread есть небольшое преимущество над планировщиками под ним в списке:
Delegate - (1000000) - 00:00:00.0440025
Observable.Range() - (1000000) - 00:00:01.9251101
Subject.Subscribe() - NewThread - (1000000) - 00:00:00.0400023
Subject.Subscribe() - CurrentThread - (1000000) - 00:00:00.0530030
Subject.Subscribe() - Immediate - (1000000) - 00:00:00.0490028
Subject.Subscribe() - ThreadPool - (1000000) - 00:00:00.0490028
Subject.Subscribe() - Default - (1000000) - 00:00:00.0480028
Таким образом, кажется, что они очень сильно работали на производительности.
Ответ 3
Помните, что ваш делегат не гарантирует безопасность потока - он буквально вызывает делегата из любого потока, из которого он вызвал, тогда как при вызове Observable.ObserveOn для отправки уведомлений другим потокам Rx.NET должен делать блокировку, чтобы сделать уверен, что он делает то, что, по вашему мнению, делает.
Таким образом, делегаты могут двигаться очень быстро, но если вы хотите создать что-то практическое, используя это, вы в конечном итоге создадите синхронизацию, которая замедлит вас. При этом Rx, как и LINQ, является абстракцией - если вам нужно, чтобы это было смехотворно быстро, вы должны начать писать уродливый код.