Механизм синхронизации наблюдаемого объекта
Предположим, нам нужно синхронизировать доступ для чтения/записи к общим ресурсам. Несколько потоков будут обращаться к этому ресурсу как при чтении, так и в записи (чаще всего для чтения, иногда для записи). Предположим также, что каждая запись всегда вызывает операцию чтения (объект является наблюдаемым).
В этом примере я представлю себе такой класс (простить синтаксис и стиль, это просто для иллюстрации):
class Container {
public ObservableCollection<Operand> Operands;
public ObservableCollection<Result> Results;
}
Я пытаюсь использовать ReadWriterLockSlim
для этой цели, кроме того, я бы поставил его на уровне Container
(представьте, что объект не так прост, и одна операция чтения/записи может включать несколько объектов):
public ReadWriterLockSlim Lock;
Реализация Operand
и Result
не имеет смысла для этого примера.
Теперь представьте себе какой-то код, который соблюдает Operands
и произведет результат, чтобы вставить Results
:
void AddNewOperand(Operand operand) {
try {
_container.Lock.EnterWriteLock();
_container.Operands.Add(operand);
}
finally {
_container.ExitReadLock();
}
}
Наш гипотонический наблюдатель сделает что-то подобное, но будет потреблять новый элемент, и он будет блокироваться с помощью EnterReadLock()
для получения операндов, а затем EnterWriteLock()
для добавления результата (позвольте мне пропустить код для этого). Это приведет к исключению из-за рекурсии, но если я установил LockRecursionPolicy.SupportsRecursion
, тогда я просто открою свой код для мертвых блокировок (из MSDN):
По умолчанию новые экземпляры ReaderWriterLockSlim создаются с помощью флага LockRecursionPolicy.NoRecursion и не позволяют рекурсии. Эта политика по умолчанию рекомендуется для всех новых разработок, потому что рекурсия вводит ненужные сложности, а делает ваш код более склонным к взаимоблокировкам.
Я повторяю соответствующую часть для ясности:
Рекурсия [...] делает ваш код более склонным к взаимоблокировкам.
Если я не ошибаюсь в LockRecursionPolicy.SupportsRecursion
, если из того же потока я спрашиваю, допустим, блокировку чтения, тогда кто-то еще спрашивает о блокировке записи, тогда у меня будет мертвый замок, а то, что говорит MSDN, имеет смысл. Более того, рекурсия также ухудшит производительность также измеримым образом (и это не то, что я хочу, если я использую ReadWriterLockSlim
вместо ReadWriterLock
или Monitor
).
Вопрос (ы)
Наконец, мои вопросы (обратите внимание, что я не ищу обсуждения общих механизмов синхронизации, я бы знал, что неправильно для этого сценария производителя/наблюдаемого/наблюдателя):
- Что лучше в этой ситуации? Чтобы избежать
ReadWriterLockSlim
в пользу Monitor
(даже если в реальном мире чтение кода будет намного больше, чем пишет)?
- Откажитесь от такой грубой синхронизации? Это может даже повысить производительность, но это сделает код намного сложнее (конечно, не в этом примере, а в реальном мире).
- Должен ли я просто делать уведомления (из наблюдаемой коллекции) асинхронными?
- Что-то еще я не вижу?
Я знаю, что нет лучшего механизма синхронизации, поэтому инструмент, который мы используем, должен быть правильным для нашего случая, но мне интересно, есть ли какая-то лучшая практика или я просто игнорирую что-то очень важное между потоками и наблюдателями (представьте, что используйте Microsoft Reactive Extensions, но вопрос является общим, не привязанным к этой структуре).
Возможные решения?
Я бы попытался сделать события (как-то) отложенными:
1-е решение
Каждое изменение не запускает событие CollectionChanged
, оно хранится в очереди. Когда поставщик (объект, который выталкивает данные) завершил, он вручную заставит очередь быть сброшенной (последовательно поднимая каждое событие). Это может быть сделано в другом потоке или даже в потоке вызывающего (но вне блокировки).
Он может работать, но он сделает все менее "автоматическим" (каждое уведомление об изменении должно запускаться вручную самим производителем, больше кода для записи, больше ошибок).
2-е решение
Другое решение может заключаться в предоставлении ссылки на наш замок на наблюдаемую коллекцию. Если я обернул ReadWriterLockSlim
в пользовательский объект (полезно скрыть его в удобном для использования объекте IDisposable
), я могу добавить ManualResetEvent
, чтобы уведомить, что все блокировки были выпущены таким образом, сама коллекция может вызывать события (снова в том же потоке или в другом потоке).
3-е решение
Еще одна идея - просто сделать события асинхронными. Если обработчику событий потребуется блокировка, он будет остановлен, чтобы подождать его временного интервала. Для этого я беспокоюсь о большом количестве потоков, которое может быть использовано (особенно если из пула потоков).
Честно говоря, я не знаю, применимо ли какое-либо из них в приложении реального мира (лично - с точки зрения пользователей - я предпочитаю второй вариант, но он подразумевает коллекцию для всего, и он делает коллекцию осведомленной о потоковом режиме, и я бы избегал это, если возможно). Я не хотел бы делать код более сложным, чем необходимо.
Ответы
Ответ 1
Это похоже на многопоточную рассол. Очень сложно работать с рекурсией в этой цепочке цепочек событий, в то же время избегая взаимоблокировок. Возможно, вы захотите рассмотреть проблему вокруг проблемы.
Например, вы можете сделать добавление операнда асинхронным к началу события:
private readonly BlockingCollection<Operand> _additions
= new BlockingCollection<Operand>();
public void AddNewOperand(Operand operand)
{
_additions.Add(operand);
}
И тогда фактическое добавление произойдет в фоновом потоке:
private void ProcessAdditions()
{
foreach(var operand in _additions.GetConsumingEnumerable())
{
_container.Lock.EnterWriteLock();
_container.Operands.Add(operand);
_container.Lock.ExitWriteLock();
}
}
public void Initialize()
{
var pump = new Thread(ProcessAdditions)
{
Name = "Operand Additions Pump"
};
pump.Start();
}
Это разделение жертвует некоторой последовательностью - код, запущенный после того, как метод add фактически не узнает, когда добавление действительно произошло, и, возможно, это проблема для вашего кода. Если это так, это можно переписать для подписки на наблюдение и использовать Task
для сигнала, когда добавление завершается:
public Task AddNewOperandAsync(Operand operand)
{
var tcs = new TaskCompletionSource<byte>();
// Compose an event handler for the completion of this task
NotifyCollectionChangedEventHandler onChanged = null;
onChanged = (sender, e) =>
{
// Is this the event for the operand we have added?
if (e.NewItems.Contains(operand))
{
// Complete the task.
tcs.SetCompleted(0);
// Remove the event-handler.
_container.Operands.CollectionChanged -= onChanged;
}
}
// Hook in the handler.
_container.Operands.CollectionChanged += onChanged;
// Perform the addition.
_additions.Add(operand);
// Return the task to be awaited.
return tcs.Task;
}
Логика обработчика событий поднимается в фоновом потоке, накачивая добавляемые сообщения, поэтому нет возможности блокировать потоки переднего плана. Если вы ожидаете добавления в сообщении-насоса для окна, контекст синхронизации достаточно умен, чтобы также планировать продолжение в потоке сообщений-сообщений.
Если вы идете по маршруту Task
или нет, эта стратегия означает, что вы можете безопасно добавлять больше операндов из наблюдаемого события без повторного ввода каких-либо блокировок.
Ответ 2
Я не уверен, что это точно такая же проблема, но при работе с относительно небольшими объемами данных (записи 2k-3k) я использовал приведенный ниже код, чтобы облегчить доступ к перекрестному потоку для чтения/записи для коллекций, привязанных к UI, Этот код изначально нашел здесь.
public class BaseObservableCollection<T> : ObservableCollection<T>
{
// Constructors
public BaseObservableCollection() : base() { }
public BaseObservableCollection(IEnumerable<T> items) : base(items) { }
public BaseObservableCollection(List<T> items) : base(items) { }
// Evnet
public override event NotifyCollectionChangedEventHandler CollectionChanged;
// Event Handler
protected override void OnCollectionChanged(NotifyCollectionChangedEventArgs e)
{
// Be nice - use BlockReentrancy like MSDN said
using (BlockReentrancy())
{
if (CollectionChanged != null)
{
// Walk thru invocation list
foreach (NotifyCollectionChangedEventHandler handler in CollectionChanged.GetInvocationList())
{
DispatcherObject dispatcherObject = handler.Target as DispatcherObject;
// If the subscriber is a DispatcherObject and different thread
if (dispatcherObject != null && dispatcherObject.CheckAccess() == false)
{
// Invoke handler in the target dispatcher thread
dispatcherObject.Dispatcher.Invoke(DispatcherPriority.DataBind, handler, this, e);
}
else
{
// Execute handler as is
handler(this, e);
}
}
}
}
}
}
Я также использовал приведенный ниже код (который наследуется от приведенного выше кода) для поддержки создания события CollectionChanged
, когда элементы внутри коллекции поднимают PropertyChanged
.
public class BaseViewableCollection<T> : BaseObservableCollection<T>
where T : INotifyPropertyChanged
{
// Constructors
public BaseViewableCollection() : base() { }
public BaseViewableCollection(IEnumerable<T> items) : base(items) { }
public BaseViewableCollection(List<T> items) : base(items) { }
// Event Handlers
private void ItemPropertyChanged(object sender, PropertyChangedEventArgs e)
{
var arg = new NotifyCollectionChangedEventArgs(NotifyCollectionChangedAction.Replace, sender, sender);
base.OnCollectionChanged(arg);
}
protected override void ClearItems()
{
foreach (T item in Items) { if (item != null) { item.PropertyChanged -= ItemPropertyChanged; } }
base.ClearItems();
}
protected override void InsertItem(int index, T item)
{
if (item != null) { item.PropertyChanged += ItemPropertyChanged; }
base.InsertItem(index, item);
}
protected override void RemoveItem(int index)
{
if (Items[index] != null) { Items[index].PropertyChanged -= ItemPropertyChanged; }
base.RemoveItem(index);
}
protected override void SetItem(int index, T item)
{
if (item != null) { item.PropertyChanged += ItemPropertyChanged; }
base.SetItem(index, item);
}
}
Ответ 3
Синхронизация коллекции кросс-потоков
Привязка привязки ListBox к ObservableCollection, когда данные изменяются, вы обновляете ListBox, потому что INotifyCollectionChanged реализован.
Дефект dell'ObservableCollection заключается в том, что данные могут быть изменены только потоком, который его создал.
SynchronizedCollection не имеет проблемы с Multi-Thread, но не обновляет ListBox, потому что он не реализован INotifyCollectionChanged, даже если вы реализуете INotifyCollectionChanged, CollectionChanged (this, e) можно вызвать только из потока, который его создал.. поэтому он не работает.
Заключение
-Если вы хотите, чтобы список, содержащий автономный монопоток, использовал ObservableCollection
-Если вы хотите, чтобы список не был аутентифицирован, но многопоточным, используйте SynchronizedCollection
- Если вы хотите оба, используйте Framework 4.5, BindingOperations.EnableCollectionSynchronization и ObservableCollection() следующим образом:
/ / Creates the lock object somewhere
private static object _lock = new object () ;
...
/ / Enable the cross acces to this collection elsewhere
BindingOperations.EnableCollectionSynchronization ( _persons , _lock )
Полный образец
http://10rem.net/blog/2012/01/20/wpf-45-cross-thread-collection-synchronization-redux