TaskContinuationOptions.RunContinuationsAsynchronously и Stack Dives
В этот пост в блоге, Stephan Toub описывает новую функцию, которая будет включена в .NET 4.6, которая добавляет еще одно значение TaskCreationOptions и TaskContinuationOptions перечисления, называемые RunContinuationsAsynchronously
.
Он объясняет:
"Я говорил о ветвлении вызова методов {Try} Set * на TaskCompletionSource, что любые синхронные продолжения задачи TaskCompletionSources можно запустить синхронно, так как часть вызова. Если бы мы вызывали SetResult здесь, удерживая блокировка, затем будут выполняться синхронные продолжения этой Задачи удерживая замок, и это может привести к очень серьезным проблемам. Итак, удерживая блокировку, мы захватываем объект TaskCompletionSource для завершаться, но мы еще не закончили его, откладывая это до тех пор, пока блокировка была выпущена"
И дает следующий пример для демонстрации:
private SemaphoreSlim _gate = new SemaphoreSlim(1, 1);
private async Task WorkAsync()
{
await _gate.WaitAsync().ConfigureAwait(false);
try
{
// work here
}
finally { _gate.Release(); }
}
Теперь представьте, что у вас много вызовов WorkAsync:
await Task.WhenAll(from i in Enumerable.Range(0, 10000) select WorkAsync());
Мы только что создали 10 000 вызовов WorkAsync, которые будут соответствующим образом сериализуется на семафоре. Одна из задач введите критическую область, а остальные будут стоять в очереди на WaitAsync, внутри SemaphoreSlim эффективно запускает задачу который будет завершен, когда кто-то вызовет Release. Если Release завершил это Задача синхронно, тогда, когда первая задача вызывает Release, она будет синхронно запускать вторую задачу, а когда она вызывает Release, он будет синхронно запускать третью задачу, и поэтому на. Если раздел "//работа здесь" выше кода не содержит ждет, что уступит, тогда мы потенциально собираемся погрузиться в ныряние здесь и в конечном итоге потенциально удалите стек.
Мне сложно усвоить ту часть, где он говорит о продолжении синхронного продолжения.
Вопрос
Как это может привести к погружению в стек? Более того, а что эффективнее делать RunContinuationsAsynchronously
для решения этой проблемы?
Ответы
Ответ 1
Ключевым понятием здесь является то, что продолжение задачи может выполняться синхронно в том же потоке, который завершил предшествующую задачу.
Предположим, что это реализация SemaphoreSlim.Release
(на самом деле Toub AsyncSemphore
):
public void Release()
{
TaskCompletionSource<bool> toRelease = null;
lock (m_waiters)
{
if (m_waiters.Count > 0)
toRelease = m_waiters.Dequeue();
else
++m_currentCount;
}
if (toRelease != null)
toRelease.SetResult(true);
}
Мы видим, что он синхронно завершает задачу (используя TaskCompletionSource
).
В этом случае, если WorkAsync
не имеет других асинхронных точек (т.е. No await
вообще или все await
находятся на уже завершенной задаче), а вызов _gate.Release()
может завершить ожидающий вызов _gate.WaitAsync()
синхронно на в том же потоке вы можете достичь состояния, в котором один поток последовательно освобождает семафор, завершает следующий ожидающий вызов, выполняет // work here
, а затем снова освобождает семафор и т.д. и т.д.
Это означает, что тот же поток идет глубже и глубже в стеке, следовательно, погружение в стек.
RunContinuationsAsynchronously
гарантирует, что продолжение не будет выполняться синхронно, и поток, который освобождает семафор, движется дальше, и продолжение запланировано для другого потока (который зависит от других параметров продолжения, например, TaskScheduler
)
Это логически напоминает отправку завершения в ThreadPool
:
public void Release()
{
TaskCompletionSource<bool> toRelease = null;
lock (m_waiters)
{
if (m_waiters.Count > 0)
toRelease = m_waiters.Dequeue();
else
++m_currentCount;
}
if (toRelease != null)
Task.Run(() => toRelease.SetResult(true));
}
Ответ 2
Как это может привести к погружению в стек? Тем более, что RunContinuationsЭсинхронно эффективно будет делать, чтобы решить эту проблему?
i3arnon предоставляет очень хорошее объяснение причин внедрения RunContinuationsAsynchronously
. Мой ответ довольно ортогонален его; на самом деле, я пишу это для своей собственной справки (я сам не собираюсь вспоминать какие-то тонкости этого через полгода:)
Прежде всего, давайте посмотрим , как TaskCompletionSource
RunContinuationsAsynchronously
опция отличается от Task.Run(() => tcs.SetResult(result))
или подобных. Попробуйте просто консольное приложение:
using System;
using System.Threading;
using System.Threading.Tasks;
namespace ConsoleApplications
{
class Program
{
static void Main(string[] args)
{
ThreadPool.SetMinThreads(100, 100);
Console.WriteLine("start, " + new { System.Environment.CurrentManagedThreadId });
var tcs = new TaskCompletionSource<bool>();
// test ContinueWith-style continuations (TaskContinuationOptions.ExecuteSynchronously)
ContinueWith(1, tcs.Task);
ContinueWith(2, tcs.Task);
ContinueWith(3, tcs.Task);
// test await-style continuations
ContinueAsync(4, tcs.Task);
ContinueAsync(5, tcs.Task);
ContinueAsync(6, tcs.Task);
Task.Run(() =>
{
Console.WriteLine("before SetResult, " + new { System.Environment.CurrentManagedThreadId });
tcs.TrySetResult(true);
Thread.Sleep(10000);
});
Console.ReadLine();
}
// log
static void Continuation(int id)
{
Console.WriteLine(new { continuation = id, System.Environment.CurrentManagedThreadId });
Thread.Sleep(1000);
}
// await-style continuation
static async Task ContinueAsync(int id, Task task)
{
await task.ConfigureAwait(false);
Continuation(id);
}
// ContinueWith-style continuation
static Task ContinueWith(int id, Task task)
{
return task.ContinueWith(
t => Continuation(id),
CancellationToken.None, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default);
}
}
}
Обратите внимание, как все продолжения выполняются синхронно в том же потоке, где был вызван TrySetResult
:
start, { CurrentManagedThreadId = 1 }
before SetResult, { CurrentManagedThreadId = 3 }
{ continuation = 1, CurrentManagedThreadId = 3 }
{ continuation = 2, CurrentManagedThreadId = 3 }
{ continuation = 3, CurrentManagedThreadId = 3 }
{ continuation = 4, CurrentManagedThreadId = 3 }
{ continuation = 5, CurrentManagedThreadId = 3 }
{ continuation = 6, CurrentManagedThreadId = 3 }
Теперь, если мы не хотим, чтобы это произошло, и мы хотим, чтобы каждое продолжение выполнялось асинхронно (т.е. параллельно с другими продолжениями и, возможно, в другом потоке, в отсутствие какого-либо контекста синхронизации)?
Вот трюк, который мог бы сделать это для продолжения await
-стилей, путем установки фальшивого временного контекста синхронизации (подробнее здесь):
public static class TaskExt
{
class SimpleSynchronizationContext : SynchronizationContext
{
internal static readonly SimpleSynchronizationContext Instance = new SimpleSynchronizationContext();
};
public static void TrySetResult<TResult>(this TaskCompletionSource<TResult> @this, TResult result, bool asyncAwaitContinuations)
{
if (!asyncAwaitContinuations)
{
@this.TrySetResult(result);
return;
}
var sc = SynchronizationContext.Current;
SynchronizationContext.SetSynchronizationContext(SimpleSynchronizationContext.Instance);
try
{
@this.TrySetResult(result);
}
finally
{
SynchronizationContext.SetSynchronizationContext(sc);
}
}
}
Теперь, используя tcs.TrySetResult(true, asyncAwaitContinuations: true)
в нашем тестовом коде:
start, { CurrentManagedThreadId = 1 }
before SetResult, { CurrentManagedThreadId = 3 }
{ continuation = 1, CurrentManagedThreadId = 3 }
{ continuation = 2, CurrentManagedThreadId = 3 }
{ continuation = 3, CurrentManagedThreadId = 3 }
{ continuation = 4, CurrentManagedThreadId = 4 }
{ continuation = 5, CurrentManagedThreadId = 5 }
{ continuation = 6, CurrentManagedThreadId = 6 }
Обратите внимание, что продолжение await
продолжается параллельно (хотя и после всех синхронных продолжений ContinueWith
).
Эта логика asyncAwaitContinuations: true
является взломом и работает только для продолжений await
. Новый RunContinuationsAsynchronously
заставляет его работать последовательно для любых продолжений, прикрепленных к TaskCompletionSource.Task
.
Другим приятным аспектом RunContinuationsAsynchronously
является то, что любые продолжения await
-типа, которые планируется возобновить в определенном контексте синхронизации, будут выполняться в этом контексте асинхронно (используя SynchronizationContext.Post
, даже если TCS.Task
завершается в том же контексте ( в отличие от текущего поведения TCS.SetResult
). ContinueWith
-строчные продолжения будут также выполняться асинхронно их соответствующими планировщиками задач (чаще всего TaskScheduler.Default
или TaskScheduler.FromCurrentSynchronizationContext
). Они не будут встраиваться через TaskScheduler.TryExecuteTaskInline
Я считаю, что Стивен Туб пояснил, что в комментариях к сообщению в блоге, и это также можно увидеть здесь в CoreCLR Task.cs.
Почему мы должны беспокоиться о наложении асинхронии на все продолжения?
Мне обычно это нужно, когда я имею дело с методами async
, которые выполняются совместно (совлокальные подпрограммы).
Простым примером является асинхронная обработка с паузой: один асинхронный процесс приостанавливает/возобновляет выполнение другого. Их рабочий процесс выполнения синхронизируется в определенных await
точках, а TaskCompletionSource
используется для такого рода синхронизации, прямо или косвенно.
Ниже приведен пример готового к использованию примера кода, в котором используется адаптация Stephen Toub PauseTokenSource
. Здесь один метод async
StartAndControlWorkAsync
запускается и периодически приостанавливает/возобновляет другой метод async
, DoWorkAsync
. Попробуйте изменить asyncAwaitContinuations: true
на asyncAwaitContinuations: false
и убедитесь, что логика полностью сломана:
using System;
using System.Threading;
using System.Threading.Tasks;
namespace ConsoleApp
{
class Program
{
static void Main()
{
StartAndControlWorkAsync(CancellationToken.None).Wait();
}
// Do some work which can be paused/resumed
public static async Task DoWorkAsync(PauseToken pause, CancellationToken token)
{
try
{
var step = 0;
while (true)
{
token.ThrowIfCancellationRequested();
Console.WriteLine("Working, step: " + step++);
await Task.Delay(1000).ConfigureAwait(false);
Console.WriteLine("Before await pause.WaitForResumeAsync()");
await pause.WaitForResumeAsync();
Console.WriteLine("After await pause.WaitForResumeAsync()");
}
}
catch (Exception e)
{
Console.WriteLine("Exception: {0}", e);
throw;
}
}
// Start DoWorkAsync and pause/resume it
static async Task StartAndControlWorkAsync(CancellationToken token)
{
var pts = new PauseTokenSource();
var task = DoWorkAsync(pts.Token, token);
while (true)
{
token.ThrowIfCancellationRequested();
Console.WriteLine("Press enter to pause...");
Console.ReadLine();
Console.WriteLine("Before pause requested");
await pts.PauseAsync();
Console.WriteLine("After pause requested, paused: " + pts.IsPaused);
Console.WriteLine("Press enter to resume...");
Console.ReadLine();
Console.WriteLine("Before resume");
pts.Resume();
Console.WriteLine("After resume");
}
}
// Based on Stephen Toub PauseTokenSource
// http://blogs.msdn.com/b/pfxteam/archive/2013/01/13/cooperatively-pausing-async-methods.aspx
// the main difference is to make sure that when the consumer-side code - which requested the pause - continues,
// the producer-side code has already reached the paused (awaiting) state.
// E.g. a media player "Pause" button is clicked, gets disabled, playback stops,
// and only then "Resume" button gets enabled
public class PauseTokenSource
{
internal static readonly Task s_completedTask = Task.Delay(0);
readonly object _lock = new Object();
bool _paused = false;
TaskCompletionSource<bool> _pauseResponseTcs;
TaskCompletionSource<bool> _resumeRequestTcs;
public PauseToken Token { get { return new PauseToken(this); } }
public bool IsPaused
{
get
{
lock (_lock)
return _paused;
}
}
// request a resume
public void Resume()
{
TaskCompletionSource<bool> resumeRequestTcs = null;
lock (_lock)
{
resumeRequestTcs = _resumeRequestTcs;
_resumeRequestTcs = null;
if (!_paused)
return;
_paused = false;
}
if (resumeRequestTcs != null)
resumeRequestTcs.TrySetResult(true, asyncAwaitContinuations: true);
}
// request a pause (completes when paused state confirmed)
public Task PauseAsync()
{
Task responseTask = null;
lock (_lock)
{
if (_paused)
return _pauseResponseTcs.Task;
_paused = true;
_pauseResponseTcs = new TaskCompletionSource<bool>();
responseTask = _pauseResponseTcs.Task;
_resumeRequestTcs = null;
}
return responseTask;
}
// wait for resume request
internal Task WaitForResumeAsync()
{
Task resumeTask = s_completedTask;
TaskCompletionSource<bool> pauseResponseTcs = null;
lock (_lock)
{
if (!_paused)
return s_completedTask;
_resumeRequestTcs = new TaskCompletionSource<bool>();
resumeTask = _resumeRequestTcs.Task;
pauseResponseTcs = _pauseResponseTcs;
_pauseResponseTcs = null;
}
if (pauseResponseTcs != null)
pauseResponseTcs.TrySetResult(true, asyncAwaitContinuations: true);
return resumeTask;
}
}
// consumer side
public struct PauseToken
{
readonly PauseTokenSource _source;
public PauseToken(PauseTokenSource source) { _source = source; }
public bool IsPaused { get { return _source != null && _source.IsPaused; } }
public Task WaitForResumeAsync()
{
return IsPaused ?
_source.WaitForResumeAsync() :
PauseTokenSource.s_completedTask;
}
}
}
public static class TaskExt
{
class SimpleSynchronizationContext : SynchronizationContext
{
internal static readonly SimpleSynchronizationContext Instance = new SimpleSynchronizationContext();
};
public static void TrySetResult<TResult>(this TaskCompletionSource<TResult> @this, TResult result, bool asyncAwaitContinuations)
{
if (!asyncAwaitContinuations)
{
@this.TrySetResult(result);
return;
}
var sc = SynchronizationContext.Current;
SynchronizationContext.SetSynchronizationContext(SimpleSynchronizationContext.Instance);
try
{
@this.TrySetResult(result);
}
finally
{
SynchronizationContext.SetSynchronizationContext(sc);
}
}
}
}
Я не хотел использовать Task.Run(() => tcs.SetResult(result))
здесь, потому что было бы излишним нажимать продолжения на ThreadPool
, когда они уже запланированы для асинхронного запуска в потоке пользовательского интерфейса с соответствующим контекстом синхронизации. В то же время, если оба StartAndControlWorkAsync
и DoWorkAsync
работают в одном и том же контексте синхронизации пользовательского интерфейса, у нас также есть погружение в стек (если tcs.SetResult(result)
используется без Task.Run
или SynchronizationContext.Post
wrapping).
Теперь RunContinuationsAsynchronously
, вероятно, является лучшим решением этой проблемы.