Реактивные расширения против FileSystemWatcher
Одна из вещей, которые долгое время беспокоили меня о FileSystemWatcher, - это способ, которым он запускает несколько событий для одного логического изменения файла. Я знаю, почему это происходит, но я не хочу заботиться - я просто хочу повторно просмотреть файл один раз, а не 4-6 раз подряд. В идеале, произойдет событие, которое срабатывает только тогда, когда заданный файл будет изменен, а не каждый шаг на этом пути.
На протяжении многих лет я придумывал различные решения этой проблемы, различной степени уродства. Я думал, что Reactive Extensions станет окончательным решением, но там что-то я не делаю правильно, и я надеюсь, что кто-то может указать на мою ошибку.
У меня есть метод расширения:
public static IObservable<IEvent<FileSystemEventArgs>> GetChanged(this FileSystemWatcher that)
{
return Observable.FromEvent<FileSystemEventArgs>(that, "Changed");
}
В конечном счете, я хотел бы получить одно событие за имя файла за определенный период времени, чтобы четыре события в строке с одним именем файла были сведены к одному событию, но я ничего не теряю, если несколько файлов изменены в то же время. BufferWithTime
звучит как идеальное решение.
var bufferedChange = watcher.GetChanged()
.Select(e => e.EventArgs.FullPath)
.BufferWithTime(TimeSpan.FromSeconds(1))
.Where(e => e.Count > 0)
.Select(e => e.Distinct());
Когда я подписываюсь на это наблюдаемое, одно изменение в контролируемом файле трижды запускает мой метод подписки, что скорее поражает цель. Если я удалю вызов Distinct()
, я вижу, что каждый из четырех вызовов содержит два одинаковых события, поэтому происходит буферизация. Увеличение TimeSpan, переданное на BufferWithTime
, похоже, не имеет эффекта - я прошел до 20 секунд без каких-либо изменений в поведении.
Это мой первый набег на Rx, поэтому я, вероятно, пропустил что-то очевидное. Я делаю это неправильно? Есть ли лучший подход? Спасибо за любые предложения...
Ответы
Ответ 1
Моя ошибка. Как-то у меня есть несколько FileSystemWatchers, которые контролируют друг друга. Наблюдаемый запускал один раз для каждого наблюдателя, но BufferWithTime
, похоже, работает правильно. Мне все еще нужно выяснить, почему мои наблюдатели запускают события для папок, которые, как я думал, они настроены на игнорирование, но это не имеет никакого отношения к Rx или к этому вопросу.
На самом деле, может быть, я смогу справиться с этой проблемой и переключиться на то, чтобы один наблюдатель контролировал родительскую папку, используя Rx для фильтрации событий из папок, которые меня не интересуют.
Ответ 2
Просто, чтобы размять старую тему, так как я сейчас над этим работаю:
Конечно, эта тема пренебрежимо мала в контексте просмотра одного файла, так как FileSystemWatcher запускает только каждые 3 секунды с событием Changed для одного файла при отслеживании размера с помощью
_fileSystemWatcher.NotifyFilter = NotifyFilters.Size | ....
Но пусть предположим, что FileSystemWatcher запускает много событий подряд (возможно, многие файлы изменены/переименованы/созданы), а другие люди читают это:
В этом случае вы не хотите использовать Throttle или BufferWithTime:
Throttle
немного вводит в заблуждение.. он запрещает любую стрельбу до истечения времени TimeSpan без события. Смысл: он никогда не срабатывает, когда вы используете что-то вроде Throttle(TimeSpan.FromMilliseconds(200))
, и после каждого события происходит пауза < 200 мс. Так что это не "дросселирование" людей. Это полезно для ввода пользователем, когда вы хотите подождать, пока пользователь перестанет набирать что-то. Это плохо для дросселирования нагрузки.
BufferWithTime
также не то, что вы хотите: он просто заполняет буфера времени. Хорошо, если у вас высокая начальная нагрузка на событие, например, открытие подключения к веб-сервису. В этом случае вам нужно будет обрабатывать события каждый раз "секунды". Но не при балансировке нагрузки, так как количество событий не изменяется.
Решение - это метод Sample(TimeSpan time)
: он принимает последнее событие в TimeSpan, которое является "настоящим" дросселем. Я думаю, что ребята Rx действительно испортили именование в этом случае.
Ответ 3
вы можете использовать группу для агрегирования событий файловой системы для имени файла и использовать полученный результат с помощью метода расширений Throttle. Я написал небольшой пример с использованием целых чисел, но основная идея такая же.
var obs = from n in Enumerable.Range(1, 40).ToObservable()
group n by n / 10 into g
select new { g.Key, Obs = g.Throttle(TimeSpan.FromMilliseconds(10.0)) } into h
from x in h.Obs
select x;
obs.Subscribe(x => Console.WriteLine(x));
выходы:
9
19
29
39
40
которая для каждой группы (n/10
) - последнее наблюдаемое целое число.
Ответ 4
BufferWithTime.Where(). Выберите (...) выполнит задание, но то, что вы действительно хотите, - Throttle()