BufferBlock тупик с OutputAvailableAsync после TryReceiveAll
Работая над ответом на этот вопрос, я написал этот фрагмент:
var buffer = new BufferBlock<object>();
var producer = Task.Run(async () =>
{
while (true)
{
await Task.Delay(TimeSpan.FromMilliseconds(100));
buffer.Post(null);
Console.WriteLine("Post " + buffer.Count);
}
});
var consumer = Task.Run(async () =>
{
while (await buffer.OutputAvailableAsync())
{
IList<object> items;
buffer.TryReceiveAll(out items);
Console.WriteLine("TryReceiveAll " + buffer.Count);
}
});
await Task.WhenAll(consumer, producer);
Продюсер должен отправлять элементы в буфер каждые 100 мс, и потребитель должен очистить все элементы из буфера и асинхронно дождаться появления большего количества элементов.
Что на самом деле происходит, так это то, что производитель очищает все элементы один раз, а затем никогда больше не выходит за пределы OutputAvailableAsync
. Если я переключаю пользователя на удаление элементов один за другим, он работает как исключенный:
while (await buffer.OutputAvailableAsync())
{
object item;
while (buffer.TryReceive(out item)) ;
}
Я что-то не понимаю? Если нет, в чем проблема?
Ответы
Ответ 1
Это ошибка в SourceCore
, которая используется внутри BufferBlock
. Его метод TryReceiveAll
не включает элемент _enableOffering
boolean data, а TryReceive
делает. Это приводит к возврату задачи из OutputAvailableAsync
.
Здесь минимальное воспроизведение:
var buffer = new BufferBlock<object>();
buffer.Post(null);
IList<object> items;
buffer.TryReceiveAll(out items);
var outputAvailableAsync = buffer.OutputAvailableAsync();
buffer.Post(null);
await outputAvailableAsync; // Never completes
Я только что зафиксировал его в реестре .Net core с этим запросом на перенос. Надеемся, что исправление вскоре появится в пакете nuget.
Ответ 2
Увы, это конец сентября 2015 года, и хотя i3arnon исправил ошибку, она не была решена в версии, выпущенной через два дня после ошибки исправлено: Microsoft TPL Dataflow версии 4.5.24.
Однако IReceivableSourceBlock.TryReceive(...) работает правильно.
Метод расширения решит проблему. После новой версии TPL Dataflow будет легко изменить метод расширения.
/// <summary>
/// This extension method returns all available items in the IReceivableSourceBlock
/// or an empty sequence if nothing is available. The functin does not wait.
/// </summary>
/// <typeparam name="T">The type of items stored in the IReceivableSourceBlock</typeparam>
/// <param name="buffer">the source where the items should be extracted from </param>
/// <returns>The IList with the received items. Empty if no items were available</returns>
public static IList<T> TryReceiveAllEx<T>(this IReceivableSourceBlock<T> buffer)
{
/* Microsoft TPL Dataflow version 4.5.24 contains a bug in TryReceiveAll
* Hence this function uses TryReceive until nothing is available anymore
* */
IList<T> receivedItems = new List<T>();
T receivedItem = default(T);
while (buffer.TryReceive<T>(out receivedItem))
{
receivedItems.Add(receivedItem);
}
return receivedItems;
}
использование:
while (await this.bufferBlock.OutputAvailableAsync())
{
// some data available
var receivedItems = this.bufferBlock.TryReceiveAllEx();
if (receivedItems.Any())
{
ProcessReceivedItems(bufferBlock);
}
}