Как задачи в параллельной библиотеке задач влияют на ActivityID?

Перед использованием параллельной библиотеки задач я часто использовал CorrelationManager.ActivityId для отслеживания отчетов трассировки/ошибок с несколькими потоками.

ActivityId хранится в локальном хранилище потоков, поэтому каждый поток получает свою собственную копию. Идея заключается в том, что при запуске потока (активности) вы назначаете новый ActivityId. ActivityId будет записываться в журналы с любой другой информацией о трассировке, позволяя выделить информацию о трассировке для одного "Активность". Это действительно полезно с WCF, поскольку ActivityId может переноситься на компонент службы.

Вот пример того, что я говорю:

static void Main(string[] args)
{
    ThreadPool.QueueUserWorkItem(new WaitCallback((o) =>
    {
        DoWork();
    }));
}

static void DoWork()
{
    try
    {
        Trace.CorrelationManager.ActivityId = Guid.NewGuid();
        //The functions below contain tracing which logs the ActivityID.
        CallFunction1();
        CallFunction2();
        CallFunction3();
    }
    catch (Exception ex)
    {
        Trace.Write(Trace.CorrelationManager.ActivityId + " " + ex.ToString());
    }
}

Теперь, с помощью TPL, я понимаю, что несколько задач разделяют потоки. Означает ли это, что ActivityId подвержен повторной инициализации средней задачи (по другой задаче)? Есть ли новый механизм для отслеживания активности?

Ответы

Ответ 1

Я провел несколько экспериментов, и оказалось, что предположение в моем вопросе неверно - несколько задач, созданных с помощью TPL, не работают одновременно в одном потоке.

ThreadLocalStorage безопасен для использования с TPL в .NET 4.0, поскольку поток может использоваться только одной задачей за раз.

Предполагая, что задачи могут совместно использовать потоки одновременно, было основано на интервью, которое я услышал о С# 5.0 на DotNetRocks (извините, я не помню, какое шоу это было), - поэтому мой вопрос может (или не обязательно) стать актуальным в ближайшее время.

Мой эксперимент запускает ряд задач и записывает, сколько задач выполнялось, сколько времени они занимали и сколько потоков было потреблено. Код ниже, если кто-то захочет его повторить.

class Program
{
    static void Main(string[] args)
    {
        int totalThreads = 100;
        TaskCreationOptions taskCreationOpt = TaskCreationOptions.None;
        Task task = null;
        Stopwatch stopwatch = new Stopwatch();
        stopwatch.Start();
        Task[] allTasks = new Task[totalThreads];
        for (int i = 0; i < totalThreads; i++)
        {
            task = Task.Factory.StartNew(() =>
           {
               DoLongRunningWork();
           }, taskCreationOpt);

            allTasks[i] = task;
        }

        Task.WaitAll(allTasks);
        stopwatch.Stop();

        Console.WriteLine(String.Format("Completed {0} tasks in {1} milliseconds", totalThreads, stopwatch.ElapsedMilliseconds));
        Console.WriteLine(String.Format("Used {0} threads", threadIds.Count));
        Console.ReadKey();
    }


    private static List<int> threadIds = new List<int>();
    private static object locker = new object();
    private static void DoLongRunningWork()
    {
        lock (locker)
        {
            //Keep a record of the managed thread used.
            if (!threadIds.Contains(Thread.CurrentThread.ManagedThreadId))
                threadIds.Add(Thread.CurrentThread.ManagedThreadId);
        }
        Guid g1 = Guid.NewGuid();
        Trace.CorrelationManager.ActivityId = g1;
        Thread.Sleep(3000);
        Guid g2 = Trace.CorrelationManager.ActivityId;
        Debug.Assert(g1.Equals(g2));
    }
}

Выход (конечно, это будет зависеть от машины):

Completed 100 tasks in 23097 milliseconds
Used 23 threads

Изменение задачиCreationOpt в TaskCreationOptions.LongRunning дало разные результаты:

Completed 100 tasks in 3458 milliseconds 
Used 100 threads

Ответ 2

Пожалуйста, простите, что я опубликовал это как ответ, поскольку он не отвечает на ваш вопрос, однако он связан с вашим вопросом, поскольку он касается поведения CorrelationManager и потоков/задач/и т.д. Я рассматривал использование методов CorrelationManager LogicalOperationStackStartLogicalOperation/StopLogicalOperation) для обеспечения дополнительного контекста в сценариях многопоточности.

Я взял ваш пример и немного изменил его, чтобы добавить возможность параллельной работы с помощью Parallel.For. Кроме того, я использую StartLogicalOperation/StopLogicalOperation для скобки (внутри) DoLongRunningWork. Понятно, DoLongRunningWork делает что-то подобное при каждом его выполнении:

DoLongRunningWork
  StartLogicalOperation
  Thread.Sleep(3000)
  StopLogicalOperation

Я обнаружил, что если я добавлю эти логические операции в ваш код (более или менее как есть), все логические операторы останутся в синхронизации (всегда ожидаемое количество операций над стеком и значения операций в стеке всегда ожидаются).

В некоторых моих собственных тестах я обнаружил, что это не всегда так. Стек логической операции получал "поврежденный". Лучшее объяснение, которое я мог бы придумать, заключается в том, что "слияние" обратной информации CallContext в контексте "родительского" потока, когда поток "child" завершается, вызывает "старую" контекстную информацию дочернего потока (логическую операцию) унаследован "другим дочерним потоком дочернего узла.

Проблема также может быть связана с тем фактом, что Parallel.For, по-видимому, использует основной поток (по крайней мере, в примере кода, как написано) как один из "рабочих потоков" (или то, что они должны вызывать в параллельной домен). Всякий раз, когда выполняется DoLongRunningWork, запускается новая логическая операция (в начале) и останавливается (в конце) (то есть нажимается на LogicalOperationStack и отбрасывается обратно). Если основной поток уже имеет действующую логическую операцию, и если DoLongRunningWork выполняет ON THE MAIN THREAD, то запускается новая логическая операция, поэтому основной поток LogicalOperationStack теперь имеет две операции. Любые последующие исполнения DoLongRunningWork (до тех пор, пока эта "итерация" DoLongRunningWork выполняется в основном потоке), по-видимому, наследует основной поток LogicalOperationStack (который теперь имеет две операции над ним, а не только одну ожидаемую операцию).

Мне потребовалось много времени, чтобы понять, почему поведение LogicalOperationStack в моем примере было иным, чем в моей модифицированной версии вашего примера. Наконец, я увидел, что в моем коде я заключил в скобки всю программу в логической операции, тогда как в моей модифицированной версии вашей тестовой программы я этого не сделал. Подразумевается, что в моей тестовой программе каждый раз, когда выполнялась моя "работа" (аналогичная DoLongRunningWork), уже была логическая операция. В моей модифицированной версии вашей тестовой программы я не заключил в скобки всю программу в логической операции.

Итак, когда я изменил вашу тестовую программу, чтобы скопировать всю программу в логической операции И если я использую Parallel.For, я столкнулся с точно такой же проблемой.

Используя концептуальную модель выше, это будет успешно работать:

Parallel.For
  DoLongRunningWork
    StartLogicalOperation
    Sleep(3000)
    StopLogicalOperation

В то время как это будет в конечном итоге утверждать из-за, по-видимому, несинхронизации LogicalOperationStack:

StartLogicalOperation
Parallel.For
  DoLongRunningWork
    StartLogicalOperation
    Sleep(3000)
    StopLogicalOperation
StopLogicalOperation

Вот моя примерная программа. Он похож на ваш, поскольку он имеет метод DoLongRunningWork, который управляет ActivityId, а также LogicalOperationStack. У меня также есть два варианта ношения DoLongRunningWork. Один вкус использует Tasks, для которого используется Parallel.For. Каждый вкус также может быть выполнен таким образом, что вся параллельная операция заключена в логическую операцию или нет. Таким образом, существует четыре способа выполнения параллельной операции. Чтобы попробовать каждый, просто раскомментируйте желаемый метод "Использовать...", перекомпилируйте и запустите. UseTasks, UseTasks(true) и UseParallelFor должны выполняться до завершения. UseParallelFor(true) будет утверждать в какой-то момент, потому что LogicalOperationStack не имеет ожидаемого количества записей.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;

namespace CorrelationManagerParallelTest
{
  class Program 
  {     
    static void Main(string[] args)     
    { 
      //UseParallelFor(true) will assert because LogicalOperationStack will not have expected
      //number of entries, all others will run to completion.

      UseTasks(); //Equivalent to original test program with only the parallelized
                      //operation bracketed in logical operation.
      ////UseTasks(true); //Bracket entire UseTasks method in logical operation
      ////UseParallelFor();  //Equivalent to original test program, but use Parallel.For
                             //rather than Tasks.  Bracket only the parallelized
                             //operation in logical operation.
      ////UseParallelFor(true); //Bracket entire UseParallelFor method in logical operation
    }       

    private static List<int> threadIds = new List<int>();     
    private static object locker = new object();     

    private static int mainThreadId = Thread.CurrentThread.ManagedThreadId;

    private static int mainThreadUsedInDelegate = 0;

    // baseCount is the expected number of entries in the LogicalOperationStack
    // at the time that DoLongRunningWork starts.  If the entire operation is bracketed
    // externally by Start/StopLogicalOperation, then baseCount will be 1.  Otherwise,
    // it will be 0.
    private static void DoLongRunningWork(int baseCount)     
    {
      lock (locker)
      {
        //Keep a record of the managed thread used.             
        if (!threadIds.Contains(Thread.CurrentThread.ManagedThreadId))
          threadIds.Add(Thread.CurrentThread.ManagedThreadId);

        if (Thread.CurrentThread.ManagedThreadId == mainThreadId)
        {
          mainThreadUsedInDelegate++;
        }
      }         

      Guid lo1 = Guid.NewGuid();
      Trace.CorrelationManager.StartLogicalOperation(lo1);

      Guid g1 = Guid.NewGuid();         
      Trace.CorrelationManager.ActivityId = g1;

      Thread.Sleep(3000);         

      Guid g2 = Trace.CorrelationManager.ActivityId;
      Debug.Assert(g1.Equals(g2));

      //This assert, LogicalOperation.Count, will eventually fail if there is a logical operation
      //in effect when the Parallel.For operation was started.
      Debug.Assert(Trace.CorrelationManager.LogicalOperationStack.Count == baseCount + 1, string.Format("MainThread = {0}, Thread = {1}, Count = {2}, ExpectedCount = {3}", mainThreadId, Thread.CurrentThread.ManagedThreadId, Trace.CorrelationManager.LogicalOperationStack.Count, baseCount + 1));
      Debug.Assert(Trace.CorrelationManager.LogicalOperationStack.Peek().Equals(lo1), string.Format("MainThread = {0}, Thread = {1}, Count = {2}, ExpectedCount = {3}", mainThreadId, Thread.CurrentThread.ManagedThreadId, Trace.CorrelationManager.LogicalOperationStack.Peek(), lo1));

      Trace.CorrelationManager.StopLogicalOperation();
    } 

    private static void UseTasks(bool encloseInLogicalOperation = false)
    {
      int totalThreads = 100;
      TaskCreationOptions taskCreationOpt = TaskCreationOptions.None;
      Task task = null;
      Stopwatch stopwatch = new Stopwatch();
      stopwatch.Start();

      if (encloseInLogicalOperation)
      {
        Trace.CorrelationManager.StartLogicalOperation();
      }

      Task[] allTasks = new Task[totalThreads];
      for (int i = 0; i < totalThreads; i++)
      {
        task = Task.Factory.StartNew(() =>
        {
          DoLongRunningWork(encloseInLogicalOperation ? 1 : 0);
        }, taskCreationOpt);
        allTasks[i] = task;
      }
      Task.WaitAll(allTasks);

      if (encloseInLogicalOperation)
      {
        Trace.CorrelationManager.StopLogicalOperation();
      }

      stopwatch.Stop();
      Console.WriteLine(String.Format("Completed {0} tasks in {1} milliseconds", totalThreads, stopwatch.ElapsedMilliseconds));
      Console.WriteLine(String.Format("Used {0} threads", threadIds.Count));
      Console.WriteLine(String.Format("Main thread used in delegate {0} times", mainThreadUsedInDelegate));

      Console.ReadKey();
    }

    private static void UseParallelFor(bool encloseInLogicalOperation = false)
    {
      int totalThreads = 100;
      Stopwatch stopwatch = new Stopwatch();
      stopwatch.Start();

      if (encloseInLogicalOperation)
      {
        Trace.CorrelationManager.StartLogicalOperation();
      }

      Parallel.For(0, totalThreads, i =>
      {
        DoLongRunningWork(encloseInLogicalOperation ? 1 : 0);
      });

      if (encloseInLogicalOperation)
      {
        Trace.CorrelationManager.StopLogicalOperation();
      }

      stopwatch.Stop();
      Console.WriteLine(String.Format("Completed {0} tasks in {1} milliseconds", totalThreads, stopwatch.ElapsedMilliseconds));
      Console.WriteLine(String.Format("Used {0} threads", threadIds.Count));
      Console.WriteLine(String.Format("Main thread used in delegate {0} times", mainThreadUsedInDelegate));

      Console.ReadKey();
    }

  } 
}

Вся эта проблема: если LogicalOperationStack может использоваться с Parallel.For(и/или другими конструкциями потоков/задач) или как его можно использовать, вероятно, заслуживает собственного вопроса. Может быть, я отправлю вопрос. Тем временем, я задаюсь вопросом, есть ли у вас какие-либо мысли по этому поводу (или, интересно, рассмотрели ли вы использование LogicalOperationStack, поскольку ActivityId окажется в безопасности).

[EDIT]

См. мой ответ на этот вопрос для получения дополнительной информации об использовании LogicalOperationStack и/или CallContext.LogicalSetData с некоторыми различными конструкциями Thread/ThreadPool/Task/Parallel.

См. также мой вопрос здесь о SO о логических операционных и параллельных расширениях: Является CorrelationManager.LogicalOperationStack совместимым с Parallel.For, задачами, темами и т.д.

Наконец, см. также мой вопрос здесь на форуме Microsoft Parallel Extensions: http://social.msdn.microsoft.com/Forums/en-US/parallelextensions/thread/7c5c3051-133b-4814-9db0-fc0039b4f9d9

В моем тестировании это выглядит так: Trace.CorrelationManager.LogicalOperationStack может испортиться при использовании Parallel.For или Parallel.Invoke ЕСЛИ вы начинаете логическую операцию в основном потоке, а затем запускаете/останавливаете логические операции в делетете. В моих тестах (см. Одну из двух вышеперечисленных ссылок) в LogicalOperationStack всегда должно быть ровно 2 записи при выполнении DoLongRunningWork (если я начинаю логическую операцию в основном потоке перед тем, как начинать DoLongRunningWork с использованием различных методов). Итак, под "коррумпированным" я подразумеваю, что LogicalOperationStack в конечном итоге будет содержать больше двух записей.

Из того, что я могу сказать, вероятно, потому, что Parallel.For и Parallel.Invoke используют основной поток как один из "рабочих" потоков для выполнения действия DoLongRunningWork.

Использование стека, хранящегося в CallContext.LogicalSetData для имитации поведения LogicalOperationStack (аналогично log4net LogicalThreadContext.Stacks, который хранится через CallContext.SetData) дает еще худшие результаты. Если я использую такой стек для поддержки контекста, он становится поврежденным (т.е. Не имеет ожидаемого количества записей) почти во всех сценариях, где у меня есть "логическая операция" в основном потоке и логическая операция на каждой итерации/выполнение делегата DoLongRunningWork.