Есть ли что-то вроде асинхронного BlockingCollection <T>?
Я хотел бы await
по результату BlockingCollection<T>.Take()
асинхронно, поэтому я не блокирую поток. Ищете что-нибудь вроде этого:
var item = await blockingCollection.TakeAsync();
Я знаю, что смогу сделать это:
var item = await Task.Run(() => blockingCollection.Take());
но этот вид убивает всю идею, потому что вместо этого блокируется другой поток (ThreadPool
).
Есть ли альтернатива?
Ответы
Ответ 1
Есть четыре варианта, о которых я знаю.
Первый - это Каналы, которые предоставляют потокобезопасную очередь, которая поддерживает асинхронные операции Read
и Write
. Каналы высоко оптимизированы и при необходимости поддерживают удаление некоторых элементов при достижении порога.
Следующим является BufferBlock<T>
из потока данных TPL. Если у вас есть только один потребитель, вы можете использовать OutputAvailableAsync
или ReceiveAsync
или просто связать его с ActionBlock<T>
. Для получения дополнительной информации см. Мой блог.
Последние два типа, которые я создал, доступны в моей библиотеке AsyncEx.
AsyncCollection<T>
является async
почти эквивалентным BlockingCollection<T>
, способным обернуть параллельную коллекцию производителей/потребителей, такую как ConcurrentQueue<T>
или ConcurrentBag<T>
. Вы можете использовать TakeAsync
для асинхронного потребления элементов из коллекции. Для получения дополнительной информации см. Мой блог.
AsyncProducerConsumerQueue<T>
- это более переносимая очередь async
-compatible производителя/потребителя. Вы можете использовать DequeueAsync
для асинхронного потребления элементов из очереди. Для получения дополнительной информации см. Мой блог.
Последние три из этих альтернатив допускают синхронные и асинхронные размещения и получения.
Ответ 2
... или вы можете сделать это:
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
public class AsyncQueue<T>
{
private readonly SemaphoreSlim _sem;
private readonly ConcurrentQueue<T> _que;
public AsyncQueue()
{
_sem = new SemaphoreSlim(0);
_que = new ConcurrentQueue<T>();
}
public void Enqueue(T item)
{
_que.Enqueue(item);
_sem.Release();
}
public void EnqueueRange(IEnumerable<T> source)
{
var n = 0;
foreach (var item in source)
{
_que.Enqueue(item);
n++;
}
_sem.Release(n);
}
public async Task<T> DequeueAsync(CancellationToken cancellationToken = default(CancellationToken))
{
for (; ; )
{
await _sem.WaitAsync(cancellationToken);
T item;
if (_que.TryDequeue(out item))
{
return item;
}
}
}
}
Простая, полностью функциональная асинхронная очередь FIFO.
Примечание: SemaphoreSlim.WaitAsync
был добавлен в .NET 4.5 до этого, это было не так просто.
Ответ 3
Если вы не против небольшого взлома, попробуйте эти расширения.
public static async Task AddAsync<TEntity>(
this BlockingCollection<TEntity> Bc, TEntity item, CancellationToken abortCt)
{
while (true)
{
try
{
if (Bc.TryAdd(item, 0, abortCt))
return;
else
await Task.Delay(100, abortCt);
}
catch (Exception)
{
throw;
}
}
}
public static async Task<TEntity> TakeAsync<TEntity>(
this BlockingCollection<TEntity> Bc, CancellationToken abortCt)
{
while (true)
{
try
{
TEntity item;
if (Bc.TryTake(out item, 0, abortCt))
return item;
else
await Task.Delay(100, abortCt);
}
catch (Exception)
{
throw;
}
}
}
Ответ 4
Вот очень базовая реализация BlockingCollection
, которая поддерживает ожидание с большим количеством отсутствующих функций. Он использует библиотеку AsyncEnumerable
, которая делает возможным асинхронное перечисление для версий С# старше 8.0.
public class AsyncBlockingCollection<T>
{ // Missing features: cancellation, boundedCapacity, TakeAsync
private Queue<T> _queue = new Queue<T>();
private SemaphoreSlim _semaphore = new SemaphoreSlim(0);
private int _consumersCount = 0;
private bool _isAddingCompleted;
public void Add(T item)
{
lock (_queue)
{
if (_isAddingCompleted) throw new InvalidOperationException();
_queue.Enqueue(item);
}
_semaphore.Release();
}
public void CompleteAdding()
{
lock (_queue)
{
if (_isAddingCompleted) return;
_isAddingCompleted = true;
if (_consumersCount > 0) _semaphore.Release(_consumersCount);
}
}
public IAsyncEnumerable<T> GetConsumingEnumerable()
{
lock (_queue) _consumersCount++;
return new AsyncEnumerable<T>(async yield =>
{
while (true)
{
lock (_queue)
{
if (_queue.Count == 0 && _isAddingCompleted) break;
}
await _semaphore.WaitAsync();
bool hasItem;
T item = default;
lock (_queue)
{
hasItem = _queue.Count > 0;
if (hasItem) item = _queue.Dequeue();
}
if (hasItem) await yield.ReturnAsync(item);
}
});
}
}
Пример использования:
var abc = new AsyncBlockingCollection<int>();
var producer = Task.Run(async () =>
{
for (int i = 1; i <= 10; i++)
{
await Task.Delay(100);
abc.Add(i);
}
abc.CompleteAdding();
});
var consumer = Task.Run(async () =>
{
await abc.GetConsumingEnumerable().ForEachAsync(async item =>
{
await Task.Delay(200);
await Console.Out.WriteAsync(item + " ");
});
});
await Task.WhenAll(producer, consumer);
Выход:
1 2 3 4 5 6 7 8 9 10
Обновление: С выпуском С# 8 асинхронное перечисление asynchronous enumeration стало встроенной функцией языка. Обязательные классы (IAsyncEnumerable
, IAsyncEnumerator
) встроены в .NET Core 3.0 и предлагаются в виде пакета для .NET Framework 4.6. 1+ (Microsoft.Bcl.AsyncInterfaces).
Вот альтернативная реализация GetConsumingEnumerable
с новым синтаксисом С# 8:
public async IAsyncEnumerable<T> GetConsumingEnumerable()
{
lock (_queue) _consumersCount++;
while (true)
{
lock (_queue)
{
if (_queue.Count == 0 && _isAddingCompleted) break;
}
await _semaphore.WaitAsync();
bool hasItem;
T item = default;
lock (_queue)
{
hasItem = _queue.Count > 0;
if (hasItem) item = _queue.Dequeue();
}
if (hasItem) yield return item;
}
}
Обратите внимание на сосуществование await
и yield
в одном методе.
Пример использования (С# 8):
var consumer = Task.Run(async () =>
{
await foreach (var item in abc.GetConsumingEnumerable())
{
await Task.Delay(200);
await Console.Out.WriteAsync(item + " ");
}
});
Обратите внимание на await
перед foreach
.