Вызов TaskCompletionSource.SetResult неблокирующим образом
Я обнаружил, что TaskCompletionSource.SetResult();
вызывает код, ожидающий задачи, перед возвратом. В моем случае это приводит к тупиковой ситуации.
Это упрощенная версия, которая запускается в обычном Thread
void ReceiverRun()
while (true)
{
var msg = ReadNextMessage();
TaskCompletionSource<Response> task = requests[msg.RequestID];
if(msg.Error == null)
task.SetResult(msg);
else
task.SetException(new Exception(msg.Error));
}
}
"асинхронная" часть кода выглядит примерно так.
await SendAwaitResponse("first message");
SendAwaitResponse("second message").Wait();
Ожидание фактически вложено внутри неасинхронных вызовов.
Отклик SendAwaitResponse (упрощенный)
public static Task<Response> SendAwaitResponse(string msg)
{
var t = new TaskCompletionSource<Response>();
requests.Add(GetID(msg), t);
stream.Write(msg);
return t.Task;
}
Мое предположение заключалось в том, что второй SendAwaitResponse будет выполняться в потоке ThreadPool, но он продолжается в потоке, созданном для ReceiverRun.
Есть ли способ установить результат задачи без продолжения ожидаемого кода?
Приложение представляет собой консольное приложение .
Ответы
Ответ 1
Я обнаружил, что TaskCompletionSource.SetResult(); вызывает код, ожидающий задачи перед возвратом. В моем случае это приводит к тупиковой ситуации.
Да, у меня есть запись в блоге, документирующая это (AFAIK он не задокументирован на MSDN). Тупик происходит из-за двух вещей:
- Смесь
async
и код блокировки (т.е. метод async
вызывает Wait
).
- Выполнение задач запланировано с помощью
TaskContinuationOptions.ExecuteSynchronously
.
Я рекомендую начать с самого простого решения: удалить первое (1). I.e., не смешивайте вызовы async
и Wait
:
await SendAwaitResponse("first message");
SendAwaitResponse("second message").Wait();
Вместо этого используйте await
последовательно:
await SendAwaitResponse("first message");
await SendAwaitResponse("second message");
Если вам нужно, вы можете Wait
в альтернативном месте дальше вверх по стеку вызовов (не в методе async
).
Это мое самое рекомендуемое решение. Однако, если вы хотите попытаться удалить вторую вещь (2), вы можете сделать несколько трюков: либо оберните SetResult
в Task.Run
, чтобы заставить его на отдельный поток (my библиотека AsyncEx имеет *WithBackgroundContinuations
методы расширения, которые делают именно это), или дать вашему потоку реальный контекст (например, AsyncContext
type) и укажите ConfigureAwait(false)
, который будет заставляет игнорировать флаг ExecuteSynchronously
.
Но эти решения гораздо сложнее, чем просто разделение async
и кода блокировки.
В качестве примечания обратите внимание на TPL Dataflow; это похоже на то, что вы можете найти это полезным.
Ответ 2
Поскольку ваше приложение представляет собой консольное приложение, оно работает по умолчанию контексту синхронизации, в котором будет вызываться обратный вызов продолжения await
тот же поток ожидание выполнения задачи завершается. Если вы хотите переключать потоки после await SendAwaitResponse
, вы можете сделать это с помощью await Task.Yield()
:
await SendAwaitResponse("first message");
await Task.Yield();
// will be continued on a pool thread
// ...
SendAwaitResponse("second message").Wait(); // so no deadlock
Вы можете улучшить это, сохранив Thread.CurrentThread.ManagedThreadId
внутри Task.Result
и сравнив его с текущим идентификатором потока после await
. Если вы все еще находитесь в одном потоке, выполните await Task.Yield()
.
Хотя я понимаю, что SendAwaitResponse
- это упрощенная версия вашего реального кода, она все еще полностью синхронна внутри (как вы ее показали в своем вопросе). Почему вы ожидаете, что там будет какой-нибудь переключатель потока?
В любом случае, вы, вероятно, должны перепроектировать свою логику так, как она не делает предположений о том, в какой поток вы сейчас находитесь. Избегайте смешивания await
и Task.Wait()
и сделайте весь ваш асинхронный код. Обычно возможно придерживаться только одного Wait()
где-то на верхнем уровне (например, внутри Main
).
[EDITED] Вызов task.SetResult(msg)
из ReceiverRun
фактически передает поток управления в точку, где вы await
на task
- без переключателя потока, из-за синхронизации по умолчанию контекстное поведение. Таким образом, ваш код, который обрабатывает фактическую обработку сообщений, использует поток ReceiverRun
. В конце концов, SendAwaitResponse("second message").Wait()
вызывается в том же потоке, вызывая тупик.
Ниже приведен код консоли, смоделированный после вашего образца. Он использует await Task.Yield()
внутри ProcessAsync
, чтобы запланировать продолжение в отдельном потоке, поэтому поток управления возвращается к ReceiverRun
, и нет взаимоблокировки.
using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;
namespace ConsoleApplication
{
class Program
{
class Worker
{
public struct Response
{
public string message;
public int threadId;
}
CancellationToken _token;
readonly ConcurrentQueue<string> _messages = new ConcurrentQueue<string>();
readonly ConcurrentDictionary<string, TaskCompletionSource<Response>> _requests = new ConcurrentDictionary<string, TaskCompletionSource<Response>>();
public Worker(CancellationToken token)
{
_token = token;
}
string ReadNextMessage()
{
// using Thread.Sleep(100) for test purposes here,
// should be using ManualResetEvent (or similar synchronization primitive),
// depending on how messages arrive
string message;
while (!_messages.TryDequeue(out message))
{
Thread.Sleep(100);
_token.ThrowIfCancellationRequested();
}
return message;
}
public void ReceiverRun()
{
LogThread("Enter ReceiverRun");
while (true)
{
var msg = ReadNextMessage();
LogThread("ReadNextMessage: " + msg);
var tcs = _requests[msg];
tcs.SetResult(new Response { message = msg, threadId = Thread.CurrentThread.ManagedThreadId });
_token.ThrowIfCancellationRequested(); // this is how we terminate the loop
}
}
Task<Response> SendAwaitResponse(string msg)
{
LogThread("SendAwaitResponse: " + msg);
var tcs = new TaskCompletionSource<Response>();
_requests.TryAdd(msg, tcs);
_messages.Enqueue(msg);
return tcs.Task;
}
public async Task ProcessAsync()
{
LogThread("Enter Worker.ProcessAsync");
var task1 = SendAwaitResponse("first message");
await task1;
LogThread("result1: " + task1.Result.message);
// avoid deadlock for task2.Wait() with Task.Yield()
// comment this out and task2.Wait() will dead-lock
if (task1.Result.threadId == Thread.CurrentThread.ManagedThreadId)
await Task.Yield();
var task2 = SendAwaitResponse("second message");
task2.Wait();
LogThread("result2: " + task2.Result.message);
var task3 = SendAwaitResponse("third message");
// still on the same thread as with result 2, no deadlock for task3.Wait()
task3.Wait();
LogThread("result3: " + task3.Result.message);
var task4 = SendAwaitResponse("fourth message");
await task4;
LogThread("result4: " + task4.Result.message);
// avoid deadlock for task5.Wait() with Task.Yield()
// comment this out and task5.Wait() will dead-lock
if (task4.Result.threadId == Thread.CurrentThread.ManagedThreadId)
await Task.Yield();
var task5 = SendAwaitResponse("fifth message");
task5.Wait();
LogThread("result5: " + task5.Result.message);
LogThread("Leave Worker.ProcessAsync");
}
public static void LogThread(string message)
{
Console.WriteLine("{0}, thread: {1}", message, Thread.CurrentThread.ManagedThreadId);
}
}
static void Main(string[] args)
{
Worker.LogThread("Enter Main");
var cts = new CancellationTokenSource(5000); // cancel after 5s
var worker = new Worker(cts.Token);
Task receiver = Task.Run(() => worker.ReceiverRun());
Task main = worker.ProcessAsync();
try
{
Task.WaitAll(main, receiver);
}
catch (Exception e)
{
Console.WriteLine("Exception: " + e.Message);
}
Worker.LogThread("Leave Main");
Console.ReadLine();
}
}
}
Это не сильно отличается от выполнения Task.Run(() => task.SetResult(msg))
внутри ReceiverRun
. Единственное преимущество, о котором я могу думать, это то, что у вас есть явный контроль над переключением потоков. Таким образом, вы можете оставаться в одном потоке как можно дольше (например, для task2
, task3
, task4
, но вам по-прежнему нужен другой переключатель потока после task4
, чтобы избежать тупика на task5.Wait()
).
Оба решения в конечном итоге заставят пул потоков расти, что плохо с точки зрения производительности и масштабируемости.
Теперь, если мы заменим Task.Wait()
на await task
всюду внутри ProcessAsync
в приведенном выше коде, нам не придется использовать await Task.Yield
, и до сих пор не будет никаких взаимоблокировок. Однако вся цепочка вызовов await
после первого await task1
внутри ProcessAsync
будет фактически выполняться в потоке ReceiverRun
. Пока мы не блокируем этот поток другими тэгами Wait()
и не выполняем много работы с процессором, поскольку мы обрабатываем сообщения, этот подход может работать нормально (асинхронный IO-bound await
-style все еще должны быть в порядке, и они могут фактически вызвать неявный переключатель потоков).
Тем не менее, я думаю, вам понадобится отдельный поток с установленным на нем сериализующим контекстом синхронизации для обработки сообщений (аналогично WindowsFormsSynchronizationContext
). Это, где должен работать ваш асинхронный код, содержащий awaits
. Вам все равно нужно избегать использования Task.Wait
в этом потоке. И если обработка отдельных сообщений требует много работы с привязкой к процессору, вы должны использовать Task.Run
для такой работы. Для вызовов async IO-bound вы можете оставаться в одном потоке.
Вы можете посмотреть ActionDispatcher
/ActionDispatcherSynchronizationContext
из @StephenCleary
Нито-асинхронная библиотека для вашей асинхронной логики обработки сообщений. Надеюсь, Стивен вскочит и даст лучший ответ.
Ответ 3
"Мое предположение заключалось в том, что второй поток SendAwaitResponse будет выполняться в потоке ThreadPool, но он продолжается в потоке, созданном для ReceiverRun".
Все зависит от того, что вы делаете в SendAwaitResponse. Асинхронность и concurrency не то же самое.
Отъезд: С# 5 Async/Await - это * одновременный *?