Самый эффективный способ обработки очереди с помощью потоков

У меня есть очередь, на которую размещаются ожидающие запросы преобразования Фурье (относительно длительные операции) - в некоторых случаях мы могли бы получать тысячи запросов на преобразование в секунду, поэтому его нужно быстро.

Я обновляю старый код, чтобы использовать .net 4, а также портирование в TPL. Мне интересно, как выглядит наиболее эффективный (самый быстрый пропуск) способ обработки этой очереди. Я хотел бы использовать все доступные ядра.

В настоящее время я экспериментирую с BlockingCollection. Я создаю класс обработчика очереди, который порождает 4 задачи, которые блокируются на BlockingCollection и ждут входящей работы. Затем они обрабатывают это ожидающее преобразование. Код:

public class IncomingPacketQueue : IDisposable
    {
        BlockingCollection<IncomingPacket> _packetQ = new BlockingCollection<IncomingPacket>();

        public IncomingPacketQueue(int workerCount)
        {
            for (int i = 0; i < workerCount; i++)
            {
                Task.Factory.StartNew(Consume);
            }
        }

        public void EnqueueSweep(IncomingPacket incoming)
        {
            _packetQ.Add(incoming);
        }

        private void Consume()
        {
            foreach (var sweep in _packetQ.GetConsumingEnumerable())
            {
                //do stuff
                var worker = new IfftWorker();
                Trace.WriteLine("  Thread {0} picking up a pending ifft".With(Thread.CurrentThread.ManagedThreadId));
                worker.DoIfft(sweep);                

            }
        }

        public int QueueCount
        {
            get
            {
                return _packetQ.Count;
            }
        }

    #region IDisposable Members

    public void Dispose()
    {
        _packetQ.CompleteAdding();
    }

    #endregion
    }

Это похоже на хорошее решение? Похоже, что все ядра - это максимум, хотя я в настоящее время не знаю, сколько рабочих я должен создать в своем конструкторе.

Ответы

Ответ 1

Это выглядит разумно. Я нашел BlockingCollection довольно быстро. Я использую его для обработки десятков тысяч запросов в секунду.

Если ваше приложение связано с процессором, то вы, вероятно, не хотите создавать больше работников, чем у вас есть ядра. Конечно, вы не хотите создавать гораздо больше рабочих, чем ядра. На четырехъядерном процессоре, если вы ожидаете большую часть времени, потраченного на выполнение БПФ, тогда четыре рабочих будут есть весь процессор. Больше работников просто означает, что у вас есть переключатели контекста потока, с которыми приходится иметь дело. TPL, как правило, балансирует это для вас, но нет причин создавать, скажем, 100 работников, если вы не можете справиться с чем-то большим.

Я бы предположил, что вы проводите тесты с 3, 4, 5, 6, 7 и 8 работниками. Посмотрите, какая из них дает вам максимальную пропускную способность.

Ответ 2

Я согласен с Джимом. Ваш подход выглядит очень хорошо. Вы не станете намного лучше. Я не эксперт FFT, но я предполагаю, что эти операции почти на 100% связаны с процессором. Если это действительно так, то хорошей предпосылкой для числа рабочих будет прямая корреляция 1 к 1 с количеством ядер в машине. Вы можете использовать Environment.ProcessorCount, чтобы получить это значение. Вы можете экспериментировать с множителем, скажем, 2x или 4x, но опять же, если эти операции связаны с ЦП, то что-либо выше 1x может вызвать дополнительные накладные расходы. Использование Environment.ProcessorCount сделает ваш код более портативным.

Другое предложение... пусть TPL знает, что это выделенные потоки. Вы можете сделать это, указав опцию LongRunning.

public IncomingPacketQueue()
{
    for (int i = 0; i < Environment.ProcessorCount; i++)
    {
        Task.Factory.StartNew(Consume, TaskCreationOptions.LongRunning);
    }
}

Ответ 3

Почему бы не использовать Parallel.ForEach и позволить TPL обрабатывать количество созданных потоков.

        Parallel.ForEach(BlockingCollectionExtensions.GetConsumingPartitioneenter(_packetQ),
                         sweep => {
                           //do stuff
                           var worker = new IfftWorker();
                           Trace.WriteLine("  Thread {0} picking up a pending ifft".With(Thread.CurrentThread.ManagedThreadId));
                           worker.DoIfft(sweep);                

                         });

(GetConsumingPartitioner является частью ParallelExtensionsExtras)

Ответ 4

Сделать число рабочих настраиваемым. Также слишком много рабочих, и это будет медленнее (как указано другим плакатом), поэтому вам нужно найти сладкое место. Конфигурируемое значение позволяет тестовым прогонам находить оптимальное значение или позволять вашей программе адаптироваться к различным типам оборудования. Вы, безусловно, можете добавить это значение в App.Config и прочитать его при запуске.

Ответ 5

Вы также можете попробовать использовать PLINQ для параллелизации обработки, чтобы увидеть, как она сравнивается с подходом, который вы используете в данный момент. У него есть некоторые трюки в рукаве, которые могут сделать его очень эффективным при определенных обстоятельствах.

_packetQ.GetConsumingEnumerable().AsParallel().ForAll(
    sweep => new IfftWorker().DoIfft(sweep));