TaskContinuationOptions.RunContinuationsAsynchronously и Stack Dives

В этот пост в блоге, Stephan Toub описывает новую функцию, которая будет включена в .NET 4.6, которая добавляет еще одно значение TaskCreationOptions и TaskContinuationOptions перечисления, называемые RunContinuationsAsynchronously.

Он объясняет:

"Я говорил о ветвлении вызова методов {Try} Set * на TaskCompletionSource, что любые синхронные продолжения задачи TaskCompletionSources можно запустить синхронно, так как часть вызова. Если бы мы вызывали SetResult здесь, удерживая блокировка, затем будут выполняться синхронные продолжения этой Задачи удерживая замок, и это может привести к очень серьезным проблемам. Итак, удерживая блокировку, мы захватываем объект TaskCompletionSource для завершаться, но мы еще не закончили его, откладывая это до тех пор, пока блокировка была выпущена"

И дает следующий пример для демонстрации:

private SemaphoreSlim _gate = new SemaphoreSlim(1, 1);
private async Task WorkAsync()
{
    await _gate.WaitAsync().ConfigureAwait(false);
    try
    {
        // work here
    }
    finally { _gate.Release(); }
}

Теперь представьте, что у вас много вызовов WorkAsync:

await Task.WhenAll(from i in Enumerable.Range(0, 10000) select WorkAsync());

Мы только что создали 10 000 вызовов WorkAsync, которые будут соответствующим образом сериализуется на семафоре. Одна из задач введите критическую область, а остальные будут стоять в очереди на WaitAsync, внутри SemaphoreSlim эффективно запускает задачу который будет завершен, когда кто-то вызовет Release. Если Release завершил это Задача синхронно, тогда, когда первая задача вызывает Release, она будет синхронно запускать вторую задачу, а когда она вызывает Release, он будет синхронно запускать третью задачу, и поэтому на. Если раздел "//работа здесь" выше кода не содержит ждет, что уступит, тогда мы потенциально собираемся погрузиться в ныряние здесь и в конечном итоге потенциально удалите стек.

Мне сложно усвоить ту часть, где он говорит о продолжении синхронного продолжения.

Вопрос

Как это может привести к погружению в стек? Более того, а что эффективнее делать RunContinuationsAsynchronously для решения этой проблемы?

Ответы

Ответ 1

Ключевым понятием здесь является то, что продолжение задачи может выполняться синхронно в том же потоке, который завершил предшествующую задачу.

Предположим, что это реализация SemaphoreSlim.Release (на самом деле Toub AsyncSemphore):

public void Release() 
{ 
    TaskCompletionSource<bool> toRelease = null; 
    lock (m_waiters) 
    { 
        if (m_waiters.Count > 0) 
            toRelease = m_waiters.Dequeue(); 
        else 
            ++m_currentCount; 
    } 
    if (toRelease != null) 
        toRelease.SetResult(true); 
}

Мы видим, что он синхронно завершает задачу (используя TaskCompletionSource). В этом случае, если WorkAsync не имеет других асинхронных точек (т.е. No await вообще или все await находятся на уже завершенной задаче), а вызов _gate.Release() может завершить ожидающий вызов _gate.WaitAsync() синхронно на в том же потоке вы можете достичь состояния, в котором один поток последовательно освобождает семафор, завершает следующий ожидающий вызов, выполняет // work here, а затем снова освобождает семафор и т.д. и т.д.

Это означает, что тот же поток идет глубже и глубже в стеке, следовательно, погружение в стек.

RunContinuationsAsynchronously гарантирует, что продолжение не будет выполняться синхронно, и поток, который освобождает семафор, движется дальше, и продолжение запланировано для другого потока (который зависит от других параметров продолжения, например, TaskScheduler)

Это логически напоминает отправку завершения в ThreadPool:

public void Release() 
{ 
    TaskCompletionSource<bool> toRelease = null; 
    lock (m_waiters) 
    { 
        if (m_waiters.Count > 0) 
            toRelease = m_waiters.Dequeue(); 
        else 
            ++m_currentCount; 
    } 
    if (toRelease != null) 
        Task.Run(() => toRelease.SetResult(true)); 
}

Ответ 2

Как это может привести к погружению в стек? Тем более, что RunContinuationsЭсинхронно эффективно будет делать, чтобы решить эту проблему?

i3arnon предоставляет очень хорошее объяснение причин внедрения RunContinuationsAsynchronously. Мой ответ довольно ортогонален его; на самом деле, я пишу это для своей собственной справки (я сам не собираюсь вспоминать какие-то тонкости этого через полгода:)

Прежде всего, давайте посмотрим , как TaskCompletionSource RunContinuationsAsynchronously опция отличается от Task.Run(() => tcs.SetResult(result)) или подобных. Попробуйте просто консольное приложение:

using System;
using System.Threading;
using System.Threading.Tasks;

namespace ConsoleApplications
{
    class Program
    {
        static void Main(string[] args)
        {
            ThreadPool.SetMinThreads(100, 100);

            Console.WriteLine("start, " + new { System.Environment.CurrentManagedThreadId });

            var tcs = new TaskCompletionSource<bool>();

            // test ContinueWith-style continuations (TaskContinuationOptions.ExecuteSynchronously)
            ContinueWith(1, tcs.Task);
            ContinueWith(2, tcs.Task);
            ContinueWith(3, tcs.Task);

            // test await-style continuations
            ContinueAsync(4, tcs.Task);
            ContinueAsync(5, tcs.Task);
            ContinueAsync(6, tcs.Task);

            Task.Run(() =>
            {
                Console.WriteLine("before SetResult, " + new { System.Environment.CurrentManagedThreadId });
                tcs.TrySetResult(true);
                Thread.Sleep(10000);
            });
            Console.ReadLine();
        }

        // log
        static void Continuation(int id)
        {
            Console.WriteLine(new { continuation = id, System.Environment.CurrentManagedThreadId });
            Thread.Sleep(1000);
        }

        // await-style continuation
        static async Task ContinueAsync(int id, Task task)
        {
            await task.ConfigureAwait(false);
            Continuation(id);
        }

        // ContinueWith-style continuation
        static Task ContinueWith(int id, Task task)
        {
            return task.ContinueWith(
                t => Continuation(id),
                CancellationToken.None, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default);
        }
    }
}

Обратите внимание, как все продолжения выполняются синхронно в том же потоке, где был вызван TrySetResult:

start, { CurrentManagedThreadId = 1 }
before SetResult, { CurrentManagedThreadId = 3 }
{ continuation = 1, CurrentManagedThreadId = 3 }
{ continuation = 2, CurrentManagedThreadId = 3 }
{ continuation = 3, CurrentManagedThreadId = 3 }
{ continuation = 4, CurrentManagedThreadId = 3 }
{ continuation = 5, CurrentManagedThreadId = 3 }
{ continuation = 6, CurrentManagedThreadId = 3 }

Теперь, если мы не хотим, чтобы это произошло, и мы хотим, чтобы каждое продолжение выполнялось асинхронно (т.е. параллельно с другими продолжениями и, возможно, в другом потоке, в отсутствие какого-либо контекста синхронизации)?

Вот трюк, который мог бы сделать это для продолжения await -стилей, путем установки фальшивого временного контекста синхронизации (подробнее здесь):

public static class TaskExt
{
    class SimpleSynchronizationContext : SynchronizationContext
    {
        internal static readonly SimpleSynchronizationContext Instance = new SimpleSynchronizationContext();
    };

    public static void TrySetResult<TResult>(this TaskCompletionSource<TResult> @this, TResult result, bool asyncAwaitContinuations)
    {
        if (!asyncAwaitContinuations)
        {
            @this.TrySetResult(result);
            return;
        }

        var sc = SynchronizationContext.Current;
        SynchronizationContext.SetSynchronizationContext(SimpleSynchronizationContext.Instance);
        try
        {
            @this.TrySetResult(result);
        }
        finally
        {
            SynchronizationContext.SetSynchronizationContext(sc);
        }
    }
}

Теперь, используя tcs.TrySetResult(true, asyncAwaitContinuations: true) в нашем тестовом коде:

start, { CurrentManagedThreadId = 1 }
before SetResult, { CurrentManagedThreadId = 3 }
{ continuation = 1, CurrentManagedThreadId = 3 }
{ continuation = 2, CurrentManagedThreadId = 3 }
{ continuation = 3, CurrentManagedThreadId = 3 }
{ continuation = 4, CurrentManagedThreadId = 4 }
{ continuation = 5, CurrentManagedThreadId = 5 }
{ continuation = 6, CurrentManagedThreadId = 6 }

Обратите внимание, что продолжение await продолжается параллельно (хотя и после всех синхронных продолжений ContinueWith).

Эта логика asyncAwaitContinuations: true является взломом и работает только для продолжений await. Новый RunContinuationsAsynchronously заставляет его работать последовательно для любых продолжений, прикрепленных к TaskCompletionSource.Task.

Другим приятным аспектом RunContinuationsAsynchronously является то, что любые продолжения await -типа, которые планируется возобновить в определенном контексте синхронизации, будут выполняться в этом контексте асинхронно (используя SynchronizationContext.Post, даже если TCS.Task завершается в том же контексте ( в отличие от текущего поведения TCS.SetResult). ContinueWith -строчные продолжения будут также выполняться асинхронно их соответствующими планировщиками задач (чаще всего TaskScheduler.Default или TaskScheduler.FromCurrentSynchronizationContext). Они не будут встраиваться через TaskScheduler.TryExecuteTaskInline Я считаю, что Стивен Туб пояснил, что в комментариях к сообщению в блоге, и это также можно увидеть здесь в CoreCLR Task.cs.

Почему мы должны беспокоиться о наложении асинхронии на все продолжения?

Мне обычно это нужно, когда я имею дело с методами async, которые выполняются совместно (совлокальные подпрограммы).

Простым примером является асинхронная обработка с паузой: один асинхронный процесс приостанавливает/возобновляет выполнение другого. Их рабочий процесс выполнения синхронизируется в определенных await точках, а TaskCompletionSource используется для такого рода синхронизации, прямо или косвенно.

Ниже приведен пример готового к использованию примера кода, в котором используется адаптация Stephen Toub PauseTokenSource. Здесь один метод async StartAndControlWorkAsync запускается и периодически приостанавливает/возобновляет другой метод async, DoWorkAsync. Попробуйте изменить asyncAwaitContinuations: true на asyncAwaitContinuations: false и убедитесь, что логика полностью сломана:

using System;
using System.Threading;
using System.Threading.Tasks;

namespace ConsoleApp
{
    class Program
    {
        static void Main()
        {
            StartAndControlWorkAsync(CancellationToken.None).Wait();
        }

        // Do some work which can be paused/resumed
        public static async Task DoWorkAsync(PauseToken pause, CancellationToken token)
        {
            try
            {
                var step = 0;
                while (true)
                {
                    token.ThrowIfCancellationRequested();
                    Console.WriteLine("Working, step: " + step++);
                    await Task.Delay(1000).ConfigureAwait(false);
                    Console.WriteLine("Before await pause.WaitForResumeAsync()");
                    await pause.WaitForResumeAsync();
                    Console.WriteLine("After await pause.WaitForResumeAsync()");
                }
            }
            catch (Exception e)
            {
                Console.WriteLine("Exception: {0}", e);
                throw;
            }
        }

        // Start DoWorkAsync and pause/resume it
        static async Task StartAndControlWorkAsync(CancellationToken token)
        {
            var pts = new PauseTokenSource();
            var task = DoWorkAsync(pts.Token, token);

            while (true)
            {
                token.ThrowIfCancellationRequested();

                Console.WriteLine("Press enter to pause...");
                Console.ReadLine();

                Console.WriteLine("Before pause requested");
                await pts.PauseAsync();
                Console.WriteLine("After pause requested, paused: " + pts.IsPaused);

                Console.WriteLine("Press enter to resume...");
                Console.ReadLine();

                Console.WriteLine("Before resume");
                pts.Resume();
                Console.WriteLine("After resume");
            }
        }

        // Based on Stephen Toub PauseTokenSource
        // http://blogs.msdn.com/b/pfxteam/archive/2013/01/13/cooperatively-pausing-async-methods.aspx
        // the main difference is to make sure that when the consumer-side code - which requested the pause - continues, 
        // the producer-side code has already reached the paused (awaiting) state.
        // E.g. a media player "Pause" button is clicked, gets disabled, playback stops, 
        // and only then "Resume" button gets enabled

        public class PauseTokenSource
        {
            internal static readonly Task s_completedTask = Task.Delay(0);

            readonly object _lock = new Object();

            bool _paused = false;

            TaskCompletionSource<bool> _pauseResponseTcs;
            TaskCompletionSource<bool> _resumeRequestTcs;

            public PauseToken Token { get { return new PauseToken(this); } }

            public bool IsPaused
            {
                get
                {
                    lock (_lock)
                        return _paused;
                }
            }

            // request a resume
            public void Resume()
            {
                TaskCompletionSource<bool> resumeRequestTcs = null;

                lock (_lock)
                {
                    resumeRequestTcs = _resumeRequestTcs;
                    _resumeRequestTcs = null;

                    if (!_paused)
                        return;
                    _paused = false;
                }

                if (resumeRequestTcs != null)
                    resumeRequestTcs.TrySetResult(true, asyncAwaitContinuations: true);
            }

            // request a pause (completes when paused state confirmed)
            public Task PauseAsync()
            {
                Task responseTask = null;

                lock (_lock)
                {
                    if (_paused)
                        return _pauseResponseTcs.Task;
                    _paused = true;

                    _pauseResponseTcs = new TaskCompletionSource<bool>();
                    responseTask = _pauseResponseTcs.Task;

                    _resumeRequestTcs = null;
                }

                return responseTask;
            }

            // wait for resume request
            internal Task WaitForResumeAsync()
            {
                Task resumeTask = s_completedTask;
                TaskCompletionSource<bool> pauseResponseTcs = null;

                lock (_lock)
                {
                    if (!_paused)
                        return s_completedTask;

                    _resumeRequestTcs = new TaskCompletionSource<bool>();
                    resumeTask = _resumeRequestTcs.Task;

                    pauseResponseTcs = _pauseResponseTcs;

                    _pauseResponseTcs = null;
                }

                if (pauseResponseTcs != null)
                    pauseResponseTcs.TrySetResult(true, asyncAwaitContinuations: true);

                return resumeTask;
            }
        }

        // consumer side
        public struct PauseToken
        {
            readonly PauseTokenSource _source;

            public PauseToken(PauseTokenSource source) { _source = source; }

            public bool IsPaused { get { return _source != null && _source.IsPaused; } }

            public Task WaitForResumeAsync()
            {
                return IsPaused ?
                    _source.WaitForResumeAsync() :
                    PauseTokenSource.s_completedTask;
            }
        }


    }

    public static class TaskExt
    {
        class SimpleSynchronizationContext : SynchronizationContext
        {
            internal static readonly SimpleSynchronizationContext Instance = new SimpleSynchronizationContext();
        };

        public static void TrySetResult<TResult>(this TaskCompletionSource<TResult> @this, TResult result, bool asyncAwaitContinuations)
        {
            if (!asyncAwaitContinuations)
            {
                @this.TrySetResult(result);
                return;
            }

            var sc = SynchronizationContext.Current;
            SynchronizationContext.SetSynchronizationContext(SimpleSynchronizationContext.Instance);
            try
            {
                @this.TrySetResult(result);
            }
            finally
            {
                SynchronizationContext.SetSynchronizationContext(sc);
            }
        }
    }
}

Я не хотел использовать Task.Run(() => tcs.SetResult(result)) здесь, потому что было бы излишним нажимать продолжения на ThreadPool, когда они уже запланированы для асинхронного запуска в потоке пользовательского интерфейса с соответствующим контекстом синхронизации. В то же время, если оба StartAndControlWorkAsync и DoWorkAsync работают в одном и том же контексте синхронизации пользовательского интерфейса, у нас также есть погружение в стек (если tcs.SetResult(result) используется без Task.Run или SynchronizationContext.Post wrapping).

Теперь RunContinuationsAsynchronously, вероятно, является лучшим решением этой проблемы.