Ответ 1
Введение
Ключом к этой проблеме является сортировка. В любом случае, вы смотрите на это, требуется некоторая форма буферизации. Несмотря на то, что какое-то сложное сочетание операторов может это сделать, я думаю, что это хороший пример, где Observable.Create
- хороший выбор.
Обобщение решения
Я приложил некоторые усилия, чтобы обобщить мой подход, чтобы принять любой тип ключа заказа. Для этого я ожидаю, что вам дадут:
- Функция выбора ключа, используемая для получения ключа события, типа
Func<TSource,TKey>
- Начальный ключ типа
TKey
- Функция для получения следующего ключа в последовательности типа
Func<TKey,TKey>
- Селектор результатов для генерации результата из парных событий в исходных потоках типа
Func<TSource,TSource,TSource>
Поскольку я использую только целую последовательность на основе 1 для своих тестов, это выполняется:
- keySelector:
i => i
- firstKey:
1
- nextKeyFunc:
k => k+1
- resultSelector:
(left,right) => left
Сортировка
Вот моя попытка Sort
. Он буферизует события в словарь и сбрасывает их как можно скорее для подписчика:
public static IObservable<TSource> Sort<TSource, TKey>
(this IObservable<TSource> source,
Func<TSource, TKey> keySelector,
TKey firstKey,
Func<TKey, TKey> nextKeyFunc)
{
return Observable.Create<TSource>(o =>
{
var nextKey = firstKey;
var buffer = new Dictionary<TKey, TSource>();
return source.Subscribe(i =>
{
if (keySelector(i).Equals(nextKey))
{
nextKey = nextKeyFunc(nextKey);
o.OnNext(i);
TSource nextValue;
while (buffer.TryGetValue(nextKey, out nextValue))
{
buffer.Remove(nextKey);
o.OnNext(nextValue);
nextKey = nextKeyFunc(nextKey);
}
}
else buffer.Add(keySelector(i), i);
});
});
}
Я должен сказать, что это довольно наивная реализация. В прошлом году в производственном коде я подробно остановился на этой проблеме с конкретной обработкой ошибок, буфером фиксированного размера и тайм-аутами, чтобы предотвратить утечку ресурсов. Однако это будет сделано для этого примера.:)
С этим отсортированным (извините!), теперь мы можем посмотреть на обработку нескольких потоков.
Объединение результатов
Первая попытка
Моя первая попытка состоит в том, чтобы создать неупорядоченный поток событий, который был замечен требуемым количеством раз. Затем это можно было бы отсортировать. Я делаю это, группируя элементы по ключу, используя GroupByUntil
, чтобы удерживать каждую группу до тех пор, пока не будут захвачены два элемента. Каждая группа представляет собой поток результатов одного и того же ключа. Для простого примера целочисленных событий я могу просто взять последний элемент каждой группы. Однако мне это не нравится, потому что это неудобно для более реальных сценариев, где каждый поток результатов может способствовать чему-то полезному. Я включаю код ради интереса. Обратите внимание, что тесты могут быть разделены между этой и моей второй попыткой, я принимаю неиспользованный параметр resultSelector:
public static IObservable<TSource> OrderedCollect<TSource, TKey>
(this IObservable<TSource> left,
IObservable<TSource> right,
Func<TSource, TKey> keySelector,
TKey firstKey,
Func<TKey, TKey> nextKeyFunc
Func<TSource,TSource,TSource> resultSelector)
{
return left.Merge(right)
.GroupByUntil(keySelector, x => x.Take(2).LastAsync())
.SelectMany(x => x.LastAsync())
.Sort(keySelector, firstKey, nextKeyFunc);
}
Кроме того: вы можете взломать предложение SelectMany
, чтобы решить, как выбрать результаты. Одно из преимуществ этого решения заключается во второй попытке, заключается в том, что в сценариях со многими результирующими потоками легче увидеть, как его расширить, чтобы выбрать, первые два из трех кортежей результатов, которые нужно будет прибыть.
Вторая попытка
Для этого подхода я сортирую каждый поток независимо, а затем Zip
результаты вместе. Мало того, что это гораздо более простая работа, также гораздо проще комбинировать результаты из каждого потока интересными способами. Чтобы совместить тесты с моим первым подходом, я выбираю функцию resultSelector для использования первых событий потока в качестве результатов, но, очевидно, у вас есть гибкость, чтобы сделать что-то полезное в вашем сценарии:
public static IObservable<TSource> OrderedCollect<TSource, TKey>
(this IObservable<TSource> left,
IObservable<TSource> right,
Func<TSource, TKey> keySelector,
TKey firstKey,
Func<TKey, TKey> nextKeyFunc,
Func<TSource, TSource, TSource> resultSelector)
{
return Observable.Zip(
left.Sort(keySelector, firstKey, nextKeyFunc),
right.Sort(keySelector, firstKey, nextKeyFunc),
resultSelector);
}
Кроме того, нетрудно увидеть, как этот код будет распространен на более общий случай, принимающий любое количество входных потоков, но, как упоминалось ранее, использование Zip
делает его весьма негибким в отношении блокировки при заданном пока не появятся результаты всех потоков.
Испытательные случаи
Наконец, вот мои тесты повторяют ваши примеры сценариев. Чтобы запустить их, импортируйте пакеты nuget rx-testing
и nunit
и поместите указанные выше реализации в статический класс:
public class ReorderingEventsTests : ReactiveTest
{
[Test]
public void ReorderingTest1()
{
var scheduler = new TestScheduler();
var s1 = scheduler.CreateColdObservable(
OnNext(100, 1),
OnNext(200, 2),
OnNext(400, 3),
OnNext(500, 4));
var s2 = scheduler.CreateColdObservable(
OnNext(100, 1),
OnNext(200, 3),
OnNext(300, 2),
OnNext(500, 4));
var results = scheduler.CreateObserver<int>();
s1.OrderedCollect(
right: s2,
keySelector: i => i,
firstKey: 1,
nextKeyFunc: i => i + 1,
resultSelector: (left,right) => left).Subscribe(results);
scheduler.Start();
results.Messages.AssertEqual(
OnNext(100, 1),
OnNext(300, 2),
OnNext(400, 3),
OnNext(500, 4));
}
[Test]
public void ReorderingTest2()
{
var scheduler = new TestScheduler();
var s1 = scheduler.CreateColdObservable(
OnNext(100, 1),
OnNext(200, 2),
OnNext(300, 3),
OnNext(400, 4));
var s2 = scheduler.CreateColdObservable(
OnNext(100, 4),
OnNext(200, 3),
OnNext(300, 2),
OnNext(400, 1));
var results = scheduler.CreateObserver<int>();
s1.OrderedCollect(
right: s2,
keySelector: i => i,
firstKey: 1,
nextKeyFunc: i => i + 1,
resultSelector: (left, right) => left).Subscribe(results);
scheduler.Start();
results.Messages.AssertEqual(
OnNext(400, 1),
OnNext(400, 2),
OnNext(400, 3),
OnNext(400, 4));
}
}
Коррекция во избежание повторения
Заключительный комментарий, потому что я ненавижу повторять себя в коде, вот настройка, которая позволяет избежать повторяющегося способа, который я называю Sort
во втором подходе. Я не включил его в основной корпус, чтобы не путать читателей, незнакомых с карри:
public static IObservable<TSource> OrderedCollect<TSource, TKey>
(this IObservable<TSource> left,
IObservable<TSource> right,
Func<TSource, TKey> keySelector,
TKey firstKey,
Func<TKey, TKey> nextKeyFunc,
Func<TSource, TSource, TSource> resultSelector)
{
Func<IObservable<TSource>, IObservable<TSource>> curriedSort =
events => events.Sort(keySelector, firstKey, nextKeyFunc);
return Observable.Zip(
curriedSort(left),
curriedSort(right),
resultSelector);
}