Ускорение цикла с использованием многопоточности в С# (вопрос)

Представьте, что у меня есть функция, которая проходит через миллион/миллиард строк и проверяет что-то в них.

f.ex:

foreach (String item in ListOfStrings)
{
    result.add(CalculateSmth(item));
}

он потребляет много времени, потому что CalculateSmth - очень трудоемкая функция.

Я хочу спросить: как интегрировать многопоточность в этот процесс?

f.ex: Я хочу запустить 5 потоков, и каждый из них возвращает некоторые результаты, и это продолжается до тех пор, пока список не получит элементы.

Может быть, кто-нибудь может показать некоторые примеры или статьи.

Забыл отметить, что мне это нужно в .NET 2.0

Ответы

Ответ 1

Вы можете попробовать Parallel extensions (часть .NET 4.0)

Это позволяет вам написать что-то вроде:

Parallel.Foreach (ListOfStrings, (item) => 
    result.add(CalculateSmth(item));
);

Конечно, result.add должен быть потокобезопасным.

Ответ 2

Расширения Parallel - это классно, но это также можно сделать, просто используя threadpool:

using System.Collections.Generic;
using System.Threading;

namespace noocyte.Threading
{
    class CalcState
    {
        public CalcState(ManualResetEvent reset, string input) {
            Reset = reset;
            Input = input;
        }
        public ManualResetEvent Reset { get; private set; }
        public string Input { get; set; }
    }

    class CalculateMT
    {
        List<string> result = new List<string>();
        List<ManualResetEvent> events = new List<ManualResetEvent>();

        private void Calc() {
            List<string> aList = new List<string>();
            aList.Add("test");

            foreach (var item in aList)
            {
                CalcState cs = new CalcState(new ManualResetEvent(false), item);
                events.Add(cs.Reset);
                ThreadPool.QueueUserWorkItem(new WaitCallback(Calculate), cs);
            }
            WaitHandle.WaitAll(events.ToArray());
        }

        private void Calculate(object s)
        {
            CalcState cs = s as CalcState;
            cs.Reset.Set();
            result.Add(cs.Input);
        }
    }
}

Ответ 3

Обратите внимание, что concurrency не волшебным образом дает вам больше ресурсов. Вам нужно установить, что замедляет CalculateSmth.

Например, если он привязан к процессору (и вы находитесь на одном ядре), то такое же количество тиков ЦП переходит к коду, независимо от того, выполняете ли вы их последовательно или параллельно. Кроме того, вы можете получить некоторые накладные расходы на управление потоками. Тот же аргумент применяется к другим ограничениям (например, I/O)

Вы получите только прирост производительности в этом случае, если CalculateSmth оставляет ресурс свободным во время его выполнения, который может использоваться другим экземпляром. Это не редкость. Например, если задача включает в себя IO, за которым следует некоторый процессор, тогда процесс 1 может выполнять материал процессора, пока процесс 2 выполняет IO. Как указывает маты, цепочка производителей-потребителей может достичь этого, если у вас есть инфраструктура.

Ответ 4

Вам нужно разделить работу, которую вы хотите сделать параллельно. Вот пример того, как вы можете разделить работу на две части:

List<string> work = (some list with lots of strings)

// Split the work in two
List<string> odd = new List<string>();
List<string> even = new List<string>();
for (int i = 0; i < work.Count; i++)
{
    if (i % 2 == 0)
    {
        even.Add(work[i]);
    }
    else
    {
        odd.Add(work[i]);
    }
}

// Set up to worker delegates
List<Foo> oddResult = new List<Foo>();
Action oddWork = delegate { foreach (string item in odd) oddResult.Add(CalculateSmth(item)); };

List<Foo> evenResult = new List<Foo>();
Action evenWork = delegate { foreach (string item in even) evenResult.Add(CalculateSmth(item)); };

// Run two delegates asynchronously
IAsyncResult evenHandle = evenWork.BeginInvoke(null, null);
IAsyncResult oddHandle = oddWork.BeginInvoke(null, null);

// Wait for both to finish
evenWork.EndInvoke(evenHandle);
oddWork.EndInvoke(oddHandle);

// Merge the results from the two jobs
List<Foo> allResults = new List<Foo>();
allResults.AddRange(oddResult);
allResults.AddRange(evenResult);

return allResults;

Ответ 5

Первый вопрос, на который вы должны ответить, - использовать ли потоки

Если ваша функция CalculateSmth() в основном связана с ЦП, т.е. тяжелая загрузка процессора и, в основном, не использование ввода-вывода, то мне трудно найти точку использования потоков, поскольку потоки будут конкурировать тот же ресурс, в этом случае процессор.

Если ваш CalculateSmth() использует как CPU, так и I/O, тогда это может быть вопрос использования потоков.

Я полностью согласен с комментарием к моему ответу. Я сделал ошибочное предположение, что речь идет о одном процессоре с одним ядром, но в наши дни у нас многоядерные процессоры, мне плохо.

Ответ 6

Не то, чтобы у меня есть хорошие статьи здесь, но то, что вы хотите сделать, - это что-то вроде Producer-Consumer с Threadpool.

Продюсеры проводят циклы и создают задачи (что в данном случае может состоять в простое размещение элементов в списке или стеке). Потребители - это, скажем, пять потоков, которые читают один элемент из стека, потребляют его, вычисляя его, а затем сохраняют его там где.

Таким образом, многопоточность ограничивается только этими пятью потоками, и все они будут работать, пока стек не станет пустым.

О чем подумать:

  • Поместите защиту в список ввода и вывода, например, мьютекс.
  • Если порядок важен, убедитесь, что порядок вывода сохранен. Одним из примеров может быть их сохранение в SortedList или что-то в этом роде.
  • Убедитесь, что CalculateSmth является потокобезопасным, что он не использует глобальное состояние.