Видимый BufferBlock.Post/Receive/ReceiveAsync race/bug

перекрестная ссылка на http://social.msdn.microsoft.com/Forums/en-US/tpldataflow/thread/89b3f71d-3777-4fad-9c11-50d8dc81a4a9

Я знаю... Я не использую TplDataflow в максимальном потенциале. ATM Я просто использую BufferBlock как безопасную очередь для передачи сообщений, где производитель и потребитель работают с разной скоростью. Я вижу странное поведение, которое оставляет меня в тупике относительно того, как продолжить.

private BufferBlock<object> messageQueue = new BufferBlock<object>();

public void Send(object message)
{
    var accepted=messageQueue.Post(message);
    logger.Info("Send message was called qlen = {0} accepted={1}",
    messageQueue.Count,accepted);
}

public async Task<object> GetMessageAsync()
{
    try
    {
        var m = await messageQueue.ReceiveAsync(TimeSpan.FromSeconds(30));
        //despite messageQueue.Count>0 next line 
        //occasionally does not execute
        logger.Info("message received");
        //.......
    }
    catch(TimeoutException)
    {
        //do something
    }
}

В приведенном выше коде (который является частью распределенного решения с разделительной линией 2000), Send вызывается периодически каждые 100 мс или около того. Это означает, что элемент Post ed до messageQueue примерно 10 раз в секунду. Это подтверждено. Однако иногда кажется, что ReceiveAsync не завершается в течение таймаута (т.е. Post не вызывает завершение ReceiveAsync), а TimeoutException возникает после 30 секунд. На данный момент messageQueue.Count находится в сотнях. Это неожиданно. Эта проблема наблюдалась и при более медленных ставках публикации (1 пост/сек) и обычно происходит до того, как 1000 элементов прошли через BufferBlock.

Итак, чтобы обойти эту проблему, я использую следующий код, который работает, но иногда вызывает 1-секундную задержку при приеме (из-за ошибки выше)

    public async Task<object> GetMessageAsync()
    {
        try
        {
            object m;
            var attempts = 0;
            for (; ; )
            {
                try
                {
                    m = await messageQueue.ReceiveAsync(TimeSpan.FromSeconds(1));
                }
                catch (TimeoutException)
                {
                    attempts++;
                    if (attempts >= 30) throw;
                    continue;
                }
                break;

            }

            logger.Info("message received");
            //.......
        }
        catch(TimeoutException)
        {
            //do something
        }
   }

Это выглядит как условие гонки в TDF для меня, но я не могу понять, почему это не происходит в других местах, где я использую BufferBlock аналогичным образом. Экспериментальное изменение от ReceiveAsync до Receive не помогает. Я не проверял, но я представляю себе изолированно, код выше работает отлично. Это образец, который я видел в документе "Введение в поток данных TPL" tpldataflow.docx.

Что я могу сделать, чтобы понять это? Существуют ли какие-либо показатели, которые могли бы помочь определить, что происходит? Если я не могу создать надежный тестовый пример, какую дополнительную информацию я могу предложить?

Help!

Ответы

Ответ 1

Стивен, кажется, считает следующее решение

var m = ожидание messageQueue.ReceiveAsync();

вместо:

var m = ожидание messageQueue.ReceiveAsync(TimeSpan.FromSeconds(30));

Можете ли вы подтвердить или опровергнуть это?