Ответ 1
Мне кажется, что вы хотите что-то очень похожее на BlockingCollection<T>
, в котором вместо блокировки используются Task
и await
ing.
В частности, что-то, что вы можете добавить без блокировки или ожидания. Но когда вы пытаетесь удалить элемент, когда на данный момент его нет, вы можете await
до тех пор, пока не будет доступен какой-либо элемент.
Открытый интерфейс может выглядеть так:
public class AsyncQueue<T>
{
public bool IsCompleted { get; }
public Task<T> DequeueAsync();
public void Enqueue(T item);
public void FinishAdding();
}
FinishAdding()
необходимо, чтобы мы знали, когда заканчивать dequeuing.
При этом ваш код может выглядеть так (m_queue
is AsyncQueue<File>
):
var tasks = Enumerable.Range(0, 10)
.Select(i => DownloadAndEnqueue(i))
.ToArray();
Task.WhenAll(tasks).ContinueWith(t => m_queue.FinishAdding());
…
static async Task DownloadAndEnqueue(string url)
{
m_queue.Enqueue(await DownloadFile(url));
}
Это не так хорошо, как вы могли себе представить, но он должен работать.
И реализация AsyncQueue<T>
? Есть две очереди. Один из них - для завершенной работы, которая еще не была удалена. Другой - для Task
(фактически, TaskCompletionSource<T>
), которые уже были отменены, но которые пока не имеют результата.
Когда вы удаляете и завершаете работу в очереди, просто верните работу оттуда (используя Task.FromResult()
). Если очередь пуста, создайте новую Task
, добавьте ее в другую очередь и верните ее.
Когда вы завершаете какую-то завершенную работу, а в очереди есть Task
, удалите ее и завершите с помощью полученного результата. Если очередь Task
пуста, добавьте работу в первую очередь.
С помощью этого вы можете удалить и вывести из очереди столько раз, сколько захотите, и он будет работать правильно. Когда вы знаете, что новой работы не будет, вызовите FinishAdding()
. Если есть ожидающие Task
s, они будут генерировать исключение.
Другими словами:
public class AsyncQueue<T>
{
private readonly object m_lock = new object();
private bool m_finishedAdding = false;
private readonly Queue<T> m_overflowQueue = new Queue<T>();
private readonly Queue<TaskCompletionSource<T>> m_underflowQueue =
new Queue<TaskCompletionSource<T>>();
public bool IsCompleted
{
get { return m_finishedAdding && m_overflowQueue.Count == 0; }
}
public Task<T> DequeueAsync()
{
Task<T> result;
lock (m_lock)
{
if (m_overflowQueue.Count > 0)
result = Task.FromResult(m_overflowQueue.Dequeue());
else if (!m_finishedAdding)
{
var tcs = new TaskCompletionSource<T>();
m_underflowQueue.Enqueue(tcs);
result = tcs.Task;
}
else
throw new InvalidOperationException();
}
return result;
}
public void Enqueue(T item)
{
lock (m_lock)
{
if (m_finishedAdding)
throw new InvalidOperationException();
if (m_underflowQueue.Count > 0)
{
var tcs = m_underflowQueue.Dequeue();
tcs.SetResult(item);
}
else
m_overflowQueue.Enqueue(item);
}
}
public void FinishAdding()
{
lock (m_lock)
{
m_finishedAdding = true;
while (m_underflowQueue.Count > 0)
{
var tcs = m_underflowQueue.Dequeue();
tcs.SetException(new InvalidOperationException());
}
}
}
}
Если вы хотите ограничить размер рабочей очереди (и, таким образом, ограничить производителей, если они слишком быстры), вы можете сделать Enqueue()
return Task
тоже, что потребует еще одну очередь.