Ответ 1
Стивен, кажется, считает следующее решение
var m = ожидание messageQueue.ReceiveAsync();
вместо:
var m = ожидание messageQueue.ReceiveAsync(TimeSpan.FromSeconds(30));
Можете ли вы подтвердить или опровергнуть это?
перекрестная ссылка на http://social.msdn.microsoft.com/Forums/en-US/tpldataflow/thread/89b3f71d-3777-4fad-9c11-50d8dc81a4a9
Я знаю... Я не использую TplDataflow в максимальном потенциале. ATM Я просто использую BufferBlock
как безопасную очередь для передачи сообщений, где производитель и потребитель работают с разной скоростью. Я вижу странное поведение, которое оставляет меня в тупике относительно того, как
продолжить.
private BufferBlock<object> messageQueue = new BufferBlock<object>();
public void Send(object message)
{
var accepted=messageQueue.Post(message);
logger.Info("Send message was called qlen = {0} accepted={1}",
messageQueue.Count,accepted);
}
public async Task<object> GetMessageAsync()
{
try
{
var m = await messageQueue.ReceiveAsync(TimeSpan.FromSeconds(30));
//despite messageQueue.Count>0 next line
//occasionally does not execute
logger.Info("message received");
//.......
}
catch(TimeoutException)
{
//do something
}
}
В приведенном выше коде (который является частью распределенного решения с разделительной линией 2000), Send
вызывается периодически каждые 100 мс или около того. Это означает, что элемент Post
ed до messageQueue
примерно 10 раз в секунду. Это подтверждено. Однако иногда кажется, что ReceiveAsync
не завершается в течение таймаута (т.е. Post
не вызывает завершение ReceiveAsync
), а TimeoutException
возникает после 30 секунд. На данный момент messageQueue.Count
находится в сотнях. Это неожиданно. Эта проблема наблюдалась и при более медленных ставках публикации (1 пост/сек) и обычно происходит до того, как 1000 элементов прошли через BufferBlock
.
Итак, чтобы обойти эту проблему, я использую следующий код, который работает, но иногда вызывает 1-секундную задержку при приеме (из-за ошибки выше)
public async Task<object> GetMessageAsync()
{
try
{
object m;
var attempts = 0;
for (; ; )
{
try
{
m = await messageQueue.ReceiveAsync(TimeSpan.FromSeconds(1));
}
catch (TimeoutException)
{
attempts++;
if (attempts >= 30) throw;
continue;
}
break;
}
logger.Info("message received");
//.......
}
catch(TimeoutException)
{
//do something
}
}
Это выглядит как условие гонки в TDF для меня, но я не могу понять, почему это не происходит в других местах, где я использую BufferBlock
аналогичным образом. Экспериментальное изменение от ReceiveAsync
до Receive
не помогает. Я не проверял, но я представляю себе изолированно, код выше работает отлично. Это образец, который я видел в документе "Введение в поток данных TPL" tpldataflow.docx.
Что я могу сделать, чтобы понять это? Существуют ли какие-либо показатели, которые могли бы помочь определить, что происходит? Если я не могу создать надежный тестовый пример, какую дополнительную информацию я могу предложить?
Help!
Стивен, кажется, считает следующее решение
var m = ожидание messageQueue.ReceiveAsync();
вместо:
var m = ожидание messageQueue.ReceiveAsync(TimeSpan.FromSeconds(30));
Можете ли вы подтвердить или опровергнуть это?