Параллельный HashSet <T> в .NET Framework?
У меня есть следующий класс.
class Test{
public HashSet<string> Data = new HashSet<string>();
}
Мне нужно изменить поле "Данные" из разных потоков, поэтому мне хотелось бы получить некоторые мнения о моей текущей поточно-безопасной реализации.
class Test{
public HashSet<string> Data = new HashSet<string>();
public void Add(string Val){
lock(Data) Data.Add(Val);
}
public void Remove(string Val){
lock(Data) Data.Remove(Val);
}
}
Есть ли лучшее решение, чтобы перейти непосредственно к полю и защитить его от параллельного доступа несколькими потоками?
Ответы
Ответ 1
Ваша реализация верна. К сожалению,.NET Framework не предоставляет встроенный параллельный тип hashset. Тем не менее, есть некоторые обходные пути.
ConcurrentDictionary (рекомендуется)
Это первый пример использования класса ConcurrentDictionary<TKey, TValue>
в пространстве имен System.Collections.Concurrent
. В этом случае значение бессмысленно, поэтому мы можем использовать простой byte
(1 байт в памяти).
private ConcurrentDictionary<string, byte> _data;
Это рекомендуемый параметр, поскольку тип является потокобезопасным и предоставляет вам те же преимущества, что и HashSet<T>
, кроме ключа и значения - разные объекты.
Источник: Социальный MSDN
ConcurrentBag
Если вы не возражаете о повторяющихся записях, вы можете использовать класс ConcurrentBag<T>
в том же пространстве имен предыдущего класса.
private ConcurrentBag<string> _data;
Само-реализация
Наконец, как и вы, вы можете реализовать свой собственный тип данных, используя блокировку или другие способы, с помощью которых .NET предоставляет вам потокобезопасность. Вот отличный пример: Как реализовать ConcurrentHashSet в .Net
Единственным недостатком этого решения является то, что тип HashSet<T>
не имеет официального параллельного доступа даже для операций чтения.
Я цитирую код связанного сообщения (первоначально написанного Бен Мошером).
using System.Collections.Generic;
using System.Threading;
namespace BlahBlah.Utilities
{
public class ConcurrentHashSet<T> : IDisposable
{
private readonly ReaderWriterLockSlim _lock = new ReaderWriterLockSlim(LockRecursionPolicy.SupportsRecursion);
private readonly HashSet<T> _hashSet = new HashSet<T>();
#region Implementation of ICollection<T> ...ish
public bool Add(T item)
{
_lock.EnterWriteLock();
try
{
return _hashSet.Add(item);
}
finally
{
if (_lock.IsWriteLockHeld) _lock.ExitWriteLock();
}
}
public void Clear()
{
_lock.EnterWriteLock();
try
{
_hashSet.Clear();
}
finally
{
if (_lock.IsWriteLockHeld) _lock.ExitWriteLock();
}
}
public bool Contains(T item)
{
_lock.EnterReadLock();
try
{
return _hashSet.Contains(item);
}
finally
{
if (_lock.IsReadLockHeld) _lock.ExitReadLock();
}
}
public bool Remove(T item)
{
_lock.EnterWriteLock();
try
{
return _hashSet.Remove(item);
}
finally
{
if (_lock.IsWriteLockHeld) _lock.ExitWriteLock();
}
}
public int Count
{
get
{
_lock.EnterReadLock();
try
{
return _hashSet.Count;
}
finally
{
if (_lock.IsReadLockHeld) _lock.ExitReadLock();
}
}
}
#endregion
#region Dispose
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}
protected virtual void Dispose(bool disposing)
{
if (disposing)
if (_lock != null)
_lock.Dispose();
}
~ConcurrentHashSet()
{
Dispose(false);
}
#endregion
}
}
РЕДАКТИРОВАТЬ:. Переместите методы блокировки входа с помощью блоков try
, поскольку они могут генерировать исключение и выполнять инструкции, содержащиеся в блоках finally
.
Ответ 2
Вместо обертывания ConcurrentDictionary
или блокировки над HashSet
я создал фактический ConcurrentHashSet
на основе ConcurrentDictionary
.
Эта реализация поддерживает основные операции для каждого элемента без HashSet
задает операции, поскольку они имеют меньшее значение в параллельных сценариях. IMO:
var concurrentHashSet = new ConcurrentHashSet<string>(
new[]
{
"hamster",
"HAMster",
"bar",
},
StringComparer.OrdinalIgnoreCase);
concurrentHashSet.TryRemove("foo");
if (concurrentHashSet.Contains("BAR"))
{
Console.WriteLine(concurrentHashSet.Count);
}
Выход: 2
Вы можете получить это от NuGet здесь и посмотреть источник на GitHub здесь.
Ответ 3
Поскольку никто не упомянул об этом, я предлагаю альтернативный подход, который может быть или не подходит для вашей конкретной цели:
Неограниченные коллекции Microsoft
От сообщения в блоге от команды MS:
В то время как создание и запуск одновременно проще, чем когда-либо, одна из основных проблем все еще существует: изменяемое разделяемое состояние. Чтение из нескольких потоков, как правило, очень просто, но как только состояние необходимо обновить, оно становится намного сложнее, особенно в проектах, требующих блокировки.
Альтернативой блокировке является использование неизменяемого состояния. Неизменяемые структуры данных гарантированно никогда не будут меняться и, таким образом, могут свободно передаваться между различными потоками, не беспокоясь о том, чтобы наступить на кого-то elses toes.
Этот проект создает новую проблему: как вы управляете изменениями состояния без копирования всего состояния каждый раз? Это особенно сложно, когда задействованы коллекции.
В это место входят неизменные коллекции.
Эти коллекции включают ImmutableHashSet <T> и ImmutableList <T> .
Производительность
Поскольку неизменяемые коллекции используют структуры дерева данных, чтобы обеспечить структурный обмен, их характеристики производительности отличаются от изменяемых коллекций. При сравнении с блокирующей изменчивой коллекцией результаты будут зависеть от конфликтов конкуренции и шаблонов доступа. Однако, взятый из другого сообщения в блоге об неизменных коллекциях:
Q: Я слышал, что неизменные коллекции медленны. Это разные? Могу ли я использовать их, когда важна производительность или память?
A: Эти неизменные коллекции были сильно настроены, чтобы иметь конкурентоспособные характеристики производительности для изменяемых коллекций при балансировании обмена памятью. В некоторых случаях они почти так же быстро, как и изменчивые коллекции, как алгоритмически, так и в реальном времени, иногда даже быстрее, в то время как в других случаях они алгоритмически более сложны. Однако во многих случаях разница будет незначительной. Как правило, вы должны использовать простейший код, чтобы выполнить задание, а затем настроиться на производительность по мере необходимости. Неизменяемые коллекции помогают писать простой код, особенно если необходимо учитывать безопасность потока.
Другими словами, во многих случаях разница не будет заметной, и вы должны пойти по более простому выбору - что для параллельных множеств будет использовать ImmutableHashSet<T>
, так как у вас нет существующей реализации блокировки!: -)
Ответ 4
Я предпочитаю полные решения, поэтому я сделал это: помните, что мой граф реализован по-другому, потому что я не понимаю, почему запрещается читать хешсет при попытке подсчитать его значения.
@Zen, Спасибо, что начал.
[DebuggerDisplay("Count = {Count}")]
[Serializable]
public class ConcurrentHashSet<T> : ICollection<T>, ISet<T>, ISerializable, IDeserializationCallback
{
private readonly ReaderWriterLockSlim _lock = new ReaderWriterLockSlim(LockRecursionPolicy.SupportsRecursion);
private readonly HashSet<T> _hashSet = new HashSet<T>();
public ConcurrentHashSet()
{
}
public ConcurrentHashSet(IEqualityComparer<T> comparer)
{
_hashSet = new HashSet<T>(comparer);
}
public ConcurrentHashSet(IEnumerable<T> collection)
{
_hashSet = new HashSet<T>(collection);
}
public ConcurrentHashSet(IEnumerable<T> collection, IEqualityComparer<T> comparer)
{
_hashSet = new HashSet<T>(collection, comparer);
}
protected ConcurrentHashSet(SerializationInfo info, StreamingContext context)
{
_hashSet = new HashSet<T>();
// not sure about this one really...
var iSerializable = _hashSet as ISerializable;
iSerializable.GetObjectData(info, context);
}
#region Dispose
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}
protected virtual void Dispose(bool disposing)
{
if (disposing)
if (_lock != null)
_lock.Dispose();
}
public IEnumerator<T> GetEnumerator()
{
return _hashSet.GetEnumerator();
}
~ConcurrentHashSet()
{
Dispose(false);
}
public void OnDeserialization(object sender)
{
_hashSet.OnDeserialization(sender);
}
public void GetObjectData(SerializationInfo info, StreamingContext context)
{
_hashSet.GetObjectData(info, context);
}
IEnumerator IEnumerable.GetEnumerator()
{
return GetEnumerator();
}
#endregion
public void Add(T item)
{
_lock.EnterWriteLock();
try
{
_hashSet.Add(item);
}
finally
{
if(_lock.IsWriteLockHeld) _lock.ExitWriteLock();
}
}
public void UnionWith(IEnumerable<T> other)
{
_lock.EnterWriteLock();
_lock.EnterReadLock();
try
{
_hashSet.UnionWith(other);
}
finally
{
if (_lock.IsWriteLockHeld) _lock.ExitWriteLock();
if (_lock.IsReadLockHeld) _lock.ExitReadLock();
}
}
public void IntersectWith(IEnumerable<T> other)
{
_lock.EnterWriteLock();
_lock.EnterReadLock();
try
{
_hashSet.IntersectWith(other);
}
finally
{
if (_lock.IsWriteLockHeld) _lock.ExitWriteLock();
if (_lock.IsReadLockHeld) _lock.ExitReadLock();
}
}
public void ExceptWith(IEnumerable<T> other)
{
_lock.EnterWriteLock();
_lock.EnterReadLock();
try
{
_hashSet.ExceptWith(other);
}
finally
{
if (_lock.IsWriteLockHeld) _lock.ExitWriteLock();
if (_lock.IsReadLockHeld) _lock.ExitReadLock();
}
}
public void SymmetricExceptWith(IEnumerable<T> other)
{
_lock.EnterWriteLock();
try
{
_hashSet.SymmetricExceptWith(other);
}
finally
{
if (_lock.IsWriteLockHeld) _lock.ExitWriteLock();
}
}
public bool IsSubsetOf(IEnumerable<T> other)
{
_lock.EnterWriteLock();
try
{
return _hashSet.IsSubsetOf(other);
}
finally
{
if (_lock.IsWriteLockHeld) _lock.ExitWriteLock();
}
}
public bool IsSupersetOf(IEnumerable<T> other)
{
_lock.EnterWriteLock();
try
{
return _hashSet.IsSupersetOf(other);
}
finally
{
if (_lock.IsWriteLockHeld) _lock.ExitWriteLock();
}
}
public bool IsProperSupersetOf(IEnumerable<T> other)
{
_lock.EnterWriteLock();
try
{
return _hashSet.IsProperSupersetOf(other);
}
finally
{
if (_lock.IsWriteLockHeld) _lock.ExitWriteLock();
}
}
public bool IsProperSubsetOf(IEnumerable<T> other)
{
_lock.EnterWriteLock();
try
{
return _hashSet.IsProperSubsetOf(other);
}
finally
{
if (_lock.IsWriteLockHeld) _lock.ExitWriteLock();
}
}
public bool Overlaps(IEnumerable<T> other)
{
_lock.EnterWriteLock();
try
{
return _hashSet.Overlaps(other);
}
finally
{
if (_lock.IsWriteLockHeld) _lock.ExitWriteLock();
}
}
public bool SetEquals(IEnumerable<T> other)
{
_lock.EnterWriteLock();
try
{
return _hashSet.SetEquals(other);
}
finally
{
if (_lock.IsWriteLockHeld) _lock.ExitWriteLock();
}
}
bool ISet<T>.Add(T item)
{
_lock.EnterWriteLock();
try
{
return _hashSet.Add(item);
}
finally
{
if (_lock.IsWriteLockHeld) _lock.ExitWriteLock();
}
}
public void Clear()
{
_lock.EnterWriteLock();
try
{
_hashSet.Clear();
}
finally
{
if (_lock.IsWriteLockHeld) _lock.ExitWriteLock();
}
}
public bool Contains(T item)
{
_lock.EnterWriteLock();
try
{
return _hashSet.Contains(item);
}
finally
{
if (_lock.IsWriteLockHeld) _lock.ExitWriteLock();
}
}
public void CopyTo(T[] array, int arrayIndex)
{
_lock.EnterWriteLock();
try
{
_hashSet.CopyTo(array, arrayIndex);
}
finally
{
if (_lock.IsWriteLockHeld) _lock.ExitWriteLock();
}
}
public bool Remove(T item)
{
_lock.EnterWriteLock();
try
{
return _hashSet.Remove(item);
}
finally
{
if (_lock.IsWriteLockHeld) _lock.ExitWriteLock();
}
}
public int Count
{
get
{
_lock.EnterWriteLock();
try
{
return _hashSet.Count;
}
finally
{
if(_lock.IsWriteLockHeld) _lock.ExitWriteLock();
}
}
}
public bool IsReadOnly
{
get { return false; }
}
}
Ответ 5
Сложная часть создания ISet<T>
concurrent заключается в том, что установленные методы (объединение, пересечение, разность) являются итеративными по своей природе. По крайней мере, вам нужно перебирать все n элементов одного из множеств, участвующих в операции, одновременно блокируя оба набора.
Вы теряете преимущества ConcurrentDictionary<T,byte>
, когда вам нужно заблокировать весь набор во время итерации. Без блокировки эти операции не являются потокобезопасными.
Учитывая добавленные накладные расходы ConcurrentDictionary<T,byte>
, вероятно, разумнее использовать более легкий вес HashSet<T>
и просто окружать все в замках.
Если вам не нужны заданные операции, используйте ConcurrentDictionary<T,byte>
и просто используйте default(byte)
как значение при добавлении ключей.