Как я могу убедиться, что блок потока данных создает потоки только по требованию?

Я написал небольшой конвейер, используя API потока данных TPL, который получает данные из нескольких потоков и выполняет обработку на них.

Настройка 1

Когда я настраиваю его на использование MaxDegreeOfParallelism = Environment.ProcessorCount (приходит в 8 в моем случае) для каждого блока, я замечаю, что он заполняет буферы в нескольких потоках, а обработка второго блока начинается не раньше, чем до + 1700 элементов были получены во всех потоках. Вы можете увидеть это в действии здесь.

Настройка 2

Когда я устанавливаю MaxDegreeOfParallelism = 1, я замечаю, что все элементы получены в одном потоке, и обработка отправки уже начинается после приема + - 40 элементов. Данные здесь.

Настройка 3

Когда я устанавливаю MaxDegreeOfParallelism = 1 и вводю задержку в 1000 мс перед отправкой каждого входа, я замечаю, что элементы отправляются сразу после их получения, и каждый полученный элемент помещается в отдельный поток. Данные здесь.


Пока настройка. Мои вопросы следующие:

  • Когда я сравниваю установки 1 и 2, я замечаю, что элементы обработки запускаются намного быстрее, когда они выполняются последовательно по сравнению с параллельными (даже после учета того факта, что параллель имеет 8x столько потоков). Что вызывает эту разницу?

  • Так как это будет запущено в среде ASP.NET, я не хочу создавать ненужные потоки, поскольку все они происходят из одного потока. Как показано в настройке 3, он все равно будет распространяться на несколько потоков, даже если имеется только несколько данных. Это также удивительно, потому что из установки 1 я бы предположил, что данные распределяются последовательно по потокам (обратите внимание, как первые 50 элементов все идут в поток 16). Могу ли я убедиться, что он создает новые потоки только по запросу?

  • Существует еще одна концепция, называемая BufferBlock<T>. Если вход TransformBlock<T> уже вставлен в очередь, какова будет практическая разница в замене первого шага в моем конвейере (ReceiveElement) для BufferBlock?


class Program
{
    static void Main(string[] args)
    {
        var dataflowProcessor = new DataflowProcessor<string>();
        var amountOfTasks = 5;
        var tasks = new Task[amountOfTasks];

        for (var i = 0; i < amountOfTasks; i++)
        {
            tasks[i] = SpawnThread(dataflowProcessor, $"Task {i + 1}");
        }

        foreach (var task in tasks)
        {
            task.Start();
        }

        Task.WaitAll(tasks);
        Console.WriteLine("Finished feeding threads"); // Needs to use async main
        Console.Read();
    }

    private static Task SpawnThread(DataflowProcessor<string> dataflowProcessor, string taskName)
    {
        return new Task(async () =>
        {
            await FeedData(dataflowProcessor, taskName);
        });
    }

    private static async Task FeedData(DataflowProcessor<string> dataflowProcessor, string threadName)
    {
        foreach (var i in Enumerable.Range(0, short.MaxValue))
        {
            await Task.Delay(1000); // Only used for the delayedSerialProcessing test
            dataflowProcessor.Process($"Thread name: {threadName}\t Thread ID:{Thread.CurrentThread.ManagedThreadId}\t Value:{i}");
        }
    }
}


public class DataflowProcessor<T>
{
    private static readonly ExecutionDataflowBlockOptions ExecutionOptions = new ExecutionDataflowBlockOptions
    {
        MaxDegreeOfParallelism = Environment.ProcessorCount
    };

    private static readonly TransformBlock<T, T> ReceiveElement = new TransformBlock<T, T>(element =>
    {
        Console.WriteLine($"Processing received element in thread {Thread.CurrentThread.ManagedThreadId}");
        return element;
    }, ExecutionOptions);

    private static readonly ActionBlock<T> SendElement = new ActionBlock<T>(element =>
    {
        Console.WriteLine($"Processing sent element in thread {Thread.CurrentThread.ManagedThreadId}");
        Console.WriteLine(element);
    }, ExecutionOptions);

    static DataflowProcessor()
    {
        ReceiveElement.LinkTo(SendElement);

        ReceiveElement.Completion.ContinueWith(x =>
        {
            if (x.IsFaulted)
            {
                ((IDataflowBlock) ReceiveElement).Fault(x.Exception);
            }
            else
            {
                ReceiveElement.Complete();
            }
        });
    }


    public void Process(T newElement)
    {      
        ReceiveElement.Post(newElement);
    }
}

Ответы

Ответ 1

Прежде чем развертывать решение в среде ASP.NET, я предлагаю вам изменить вашу архитектуру: IIS может приостанавливать потоки в ASP.NET для собственного использования после обработки запроса, чтобы ваша задача могла быть незавершенной. Лучшим подходом является создание отдельного демона службы Windows, который обрабатывает поток данных.

Теперь вернемся к потоку данных TPL.

Мне нравится библиотека TPL Dataflow, но документация - настоящий беспорядок. Единственный полезный документ, который я нашел, - Введение в поток данных TPL.

В нем есть некоторые подсказки, которые могут быть полезны, особенно те, которые касаются настроек конфигурации (я предлагаю вам исследовать реализацию собственного TaskScheduler с использованием вашей собственной реализации TheadPool и MaxMessagesPerTask), если вы необходимо:

Встроенные блоки потока данных настраиваются, при этом обеспечивается большой контроль над тем, как и где блоки выполняют свою работу. Вот некоторые ключевые кнопки, доступные разработчику, все из которых отображаются через класс DataflowBlockOptions и его производные типы (ExecutionDataflowBlockOptions и GroupingDataflowBlockOptions), экземпляры которых могут быть предоставлены блокам во время строительства.

  • Настройка TaskScheduler, как упоминалось в @i3arnon:

    По умолчанию блок данных блокирует работу с расписанием TaskScheduler.Default, которая нацелена на внутренние действия .NET ThreadPool.

  • MaxDegreeOfParallelism

    По умолчанию используется значение 1, что означает, что в блоке может быть только одна вещь. Если установлено значение, превышающее 1, это количество сообщений может обрабатываться одновременно блоком. Если установлено значение DataflowBlockOptions.Unbounded (-1), любое количество сообщений может обрабатываться одновременно, причем максимум автоматически управляется базовым планировщиком, предназначенным блоком потока данных. Обратите внимание, что MaxDegreeOfParallelism является максимальным, а не обязательным.

  • MaxMessagesPerTask

    TPL Dataflow фокусируется на эффективности и управлении. Там, где есть необходимые компромиссы между ними, система стремится обеспечить качество по умолчанию, но также позволяет разработчику настраивать поведение в соответствии с конкретной ситуацией. Одним из таких примеров является компромисс между эффективностью и честностью. По умолчанию блоки потока данных пытаются минимизировать количество объектов задачи, необходимых для обработки всех их данных. Это обеспечивает очень эффективное выполнение; до тех пор, пока у блока есть данные, которые могут быть обработаны, блокировка задач останется для обработки доступных данных, только уходит в отставку, когда больше нет данных (пока данные снова не появятся, и в этот момент больше задач будет развернуто). Однако это может привести к проблемам справедливости. Если система в настоящее время насыщена обработкой данных из заданного набора блоков, а затем данные поступают в другие блоки, эти последние блоки будут либо ждать, пока первые блоки закончат обработку, прежде чем они смогут начать, или, альтернативно, риск переназначение системы. Это может быть или не быть правильным поведением для данной ситуации. Чтобы решить эту проблему, существует опция MaxMessagesPerTask. По умолчанию он равен DataflowBlockOptions.Unbounded (-1), что означает, что максимального значения нет. Однако, если установлено положительное число, это число будет представлять максимальное количество сообщений, которые данный блок может использовать для обработки одной задачи. Как только это ограничение достигнуто, блок должен удалить задание и заменить его репликой для продолжения обработки. Эти реплики обрабатываются справедливо в отношении всех других задач, запланированных планировщику, что позволяет блокам достичь скромности между ними. В крайнем случае, если MaxMessagesPerTask установлено в 1, для каждого сообщения будет использоваться одна задача, обеспечивающая предельную справедливость при потенциальном расходе большего количества задач, чем это могло бы быть в противном случае.

  • MaxNumberOfGroups

    Блоки группировки способны отслеживать количество групп, которые они создали, и автоматически завершают себя (отклоняя дальнейшие предлагаемые сообщения) после того, как было создано такое количество групп. По умолчанию количество групп DataflowBlockOptions.Unbounded(-1), но может быть явно установлено значение больше единицы.

  • CancellationToken

    Этот токен контролируется во время жизни блоков потока данных. Если запрос на отмену доходит до завершения блоков, блок прекратит работу как можно более вежливо и быстро.

  • Жадный

    По умолчанию целевые блоки жадные и хотят, чтобы все данные им предлагались.

  • BoundedCapacity

    Это предел количества элементов, которые блок может хранить и иметь в полете в любой момент времени.