Как я могу сделать этот цикл С# быстрее?

Резюме: Рид-ответ ниже, если вы хотите остаться на С#. Если вы готовы маршалировать на С++ (что я и есть), это более быстрое решение.

У меня есть два массива 55mb ushort в С#. Я объединять их, используя следующий цикл:

float b = (float)number / 100.0f;
for (int i = 0; i < length; i++)
{
      image.DataArray[i] = 
          (ushort)(mUIHandler.image1.DataArray[i] + 
          (ushort)(b * (float)mUIHandler.image2.DataArray[i]));
}

Этот код, в соответствии с добавлением вызовов DateTime.Now до и после, занимает 3,5 секунды для запуска. Как я могу сделать это быстрее?

EDIT: Вот какой код, который, я думаю, показывает корень проблемы. Когда следующий код запускается в новом WPF-приложении, я получаю следующие результаты:

Time elapsed: 00:00:00.4749156 //arrays added directly
Time elapsed: 00:00:00.5907879 //arrays contained in another class
Time elapsed: 00:00:02.8856150 //arrays accessed via accessor methods

Итак, когда массивы идут напрямую, время намного быстрее, чем если массивы находятся внутри другого объекта или контейнера. Этот код показывает, что каким-то образом я использую метод доступа, а не напрямую обращаюсь к массивам. Тем не менее, самый быстрый, который я, похоже, получаю, составляет полсекунды. Когда я запускаю второй список кода на С++ с помощью icc, я получаю:

Run time for pointer walk: 0.0743338

В этом случае С++ быстрее на 7 раз (с использованием icc, не уверен, что с помощью msvc можно получить такую ​​же производительность). Я не так хорошо знаком с оптимизациями там). Есть ли способ получить С# около этого уровня производительности С++, или мне просто нужно, чтобы С# вызывал мою С++-процедуру?

Листинг 1, код С#:

public class ArrayHolder
{
    int length;
    public ushort[] output;
    public ushort[] input1;
    public ushort[] input2;
    public ArrayHolder(int inLength)
    {
        length = inLength;
        output = new ushort[length];
        input1 = new ushort[length];
        input2 = new ushort[length];
    }

    public ushort[] getOutput() { return output; }
    public ushort[] getInput1() { return input1; }
    public ushort[] getInput2() { return input2; }
}


/// <summary>
/// Interaction logic for MainWindow.xaml
/// </summary>
public partial class MainWindow : Window
{
    public MainWindow()
    {
        InitializeComponent();


        Random random = new Random();

        int length = 55 * 1024 * 1024;
        ushort[] output = new ushort[length];
        ushort[] input1 = new ushort[length];
        ushort[] input2 = new ushort[length];

        ArrayHolder theArrayHolder = new ArrayHolder(length);

        for (int i = 0; i < length; i++)
        {
            output[i] = (ushort)random.Next(0, 16384);
            input1[i] = (ushort)random.Next(0, 16384);
            input2[i] = (ushort)random.Next(0, 16384);
            theArrayHolder.getOutput()[i] = output[i];
            theArrayHolder.getInput1()[i] = input1[i];
            theArrayHolder.getInput2()[i] = input2[i];
        }

        Stopwatch stopwatch = new Stopwatch(); 
        stopwatch.Start();
        int number = 44;
        float b = (float)number / 100.0f;
        for (int i = 0; i < length; i++)
        {
            output[i] =
                (ushort)(input1[i] +
                (ushort)(b * (float)input2[i]));
        } 
        stopwatch.Stop();

        Console.WriteLine("Time elapsed: {0}",
            stopwatch.Elapsed);
        stopwatch.Reset();

        stopwatch.Start();
        for (int i = 0; i < length; i++)
        {
            theArrayHolder.output[i] =
                (ushort)(theArrayHolder.input1[i] +
                (ushort)(b * (float)theArrayHolder.input2[i]));
        }
        stopwatch.Stop();

        Console.WriteLine("Time elapsed: {0}",
            stopwatch.Elapsed);
        stopwatch.Reset();

        stopwatch.Start();
        for (int i = 0; i < length; i++)
        {
            theArrayHolder.getOutput()[i] =
                (ushort)(theArrayHolder.getInput1()[i] +
                (ushort)(b * (float)theArrayHolder.getInput2()[i]));
        }
        stopwatch.Stop();

        Console.WriteLine("Time elapsed: {0}",
            stopwatch.Elapsed);
    }
}

Листинг 2, эквивалент С++:   //looptiming.cpp: Определяет точку входа для консольного приложения.   //

#include "stdafx.h"
#include <stdlib.h>
#include <windows.h>
#include <stdio.h>
#include <iostream>


int _tmain(int argc, _TCHAR* argv[])
{

    int length = 55*1024*1024;
    unsigned short* output = new unsigned short[length];
    unsigned short* input1 = new unsigned short[length];
    unsigned short* input2 = new unsigned short[length];
    unsigned short* outPtr = output;
    unsigned short* in1Ptr = input1;
    unsigned short* in2Ptr = input2;
    int i;
    const int max = 16384;
    for (i = 0; i < length; ++i, ++outPtr, ++in1Ptr, ++in2Ptr){
        *outPtr = rand()%max;
        *in1Ptr = rand()%max;
        *in2Ptr = rand()%max;
    }

    LARGE_INTEGER ticksPerSecond;
    LARGE_INTEGER tick1, tick2;   // A point in time
    LARGE_INTEGER time;   // For converting tick into real time


    QueryPerformanceCounter(&tick1);

    outPtr = output;
    in1Ptr = input1;
    in2Ptr = input2;
    int number = 44;
    float b = (float)number/100.0f;


    for (i = 0; i < length; ++i, ++outPtr, ++in1Ptr, ++in2Ptr){
        *outPtr = *in1Ptr + (unsigned short)((float)*in2Ptr * b);
    }
    QueryPerformanceCounter(&tick2);
    QueryPerformanceFrequency(&ticksPerSecond);

    time.QuadPart = tick2.QuadPart - tick1.QuadPart;

    std::cout << "Run time for pointer walk: " << (double)time.QuadPart/(double)ticksPerSecond.QuadPart << std::endl;

    return 0;
}

EDIT 2: Включение /QxHost во втором примере сокращает время до 0.0662714 секунд. Модифицирование первого цикла в качестве предложения @Reed приводит меня к

Истекшее время: 00: 00: 00.3835017

Итак, все еще недостаточно быстро для слайдера. Это время через код:

        stopwatch.Start();
        Parallel.ForEach(Partitioner.Create(0, length),
         (range) =>
         {
             for (int i = range.Item1; i < range.Item2; i++)
             {
                 output[i] =
                     (ushort)(input1[i] +
                     (ushort)(b * (float)input2[i]));
             }
         });

        stopwatch.Stop();

РЕДАКТИРОВАТЬ 3. В соответствии с предложением @Eric Lippert, я перезапустил код на С# в версии и вместо того, чтобы использовать прикрепленный отладчик, просто распечатайте результаты в диалоговом окне. Это:

  • Простые массивы: ~ 0.273s
  • Содержащиеся массивы: ~ 0.330s
  • Массивы Accessor: ~ 0.345s
  • Параллельные массивы: ~ 0.190s

(эти числа исходят из среднего значения 5)

Итак, параллельное решение, безусловно, быстрее, чем 3,5 секунды, которые я получал раньше, но все еще немного ниже 0.074 секунд, достижимых с использованием процессора non-icc. Похоже, что самое быстрое решение заключается в компиляции в выпуске, а затем на маркер с компилятором icc-скомпилированного С++, что делает возможным использование ползунка.

EDIT 4: Еще три предложения от @Eric Lippert: измените внутреннюю часть цикла for от length до array.length, используйте удвоения и попробуйте небезопасный код.

Для этих трех моментов времени:

  • длина: ~ 0.274s
  • удваивает, а не плавает: ~ 0.290s
  • небезопасно: ~ 0.376s

До сих пор параллельное решение является большим победителем. Хотя, если бы я мог добавить их через шейдер, возможно, я мог видеть, что там есть ускорение...

Здесь дополнительный код:

        stopwatch.Reset();

        stopwatch.Start();

        double b2 = ((double)number) / 100.0;
        for (int i = 0; i < output.Length; ++i)
        {
            output[i] =
                (ushort)(input1[i] +
                (ushort)(b2 * (double)input2[i]));
        }

        stopwatch.Stop();
        DoubleArrayLabel.Content += "\t" + stopwatch.Elapsed.Seconds + "." + stopwatch.Elapsed.Milliseconds;
        stopwatch.Reset();

        stopwatch.Start();

        for (int i = 0; i < output.Length; ++i)
        {
            output[i] =
                (ushort)(input1[i] +
                (ushort)(b * input2[i]));
        }

        stopwatch.Stop();
        LengthArrayLabel.Content += "\t" + stopwatch.Elapsed.Seconds + "." + stopwatch.Elapsed.Milliseconds;
        Console.WriteLine("Time elapsed: {0}",
            stopwatch.Elapsed);
        stopwatch.Reset();

        stopwatch.Start();
        unsafe
        {
            fixed (ushort* outPtr = output, in1Ptr = input1, in2Ptr = input2){
                ushort* outP = outPtr;
                ushort* in1P = in1Ptr;
                ushort* in2P = in2Ptr;
                for (int i = 0; i < output.Length; ++i, ++outP, ++in1P, ++in2P)
                {
                    *outP = (ushort)(*in1P + b * (float)*in2P);
                }
            }
        }

        stopwatch.Stop();
        UnsafeArrayLabel.Content += "\t" + stopwatch.Elapsed.Seconds + "." + stopwatch.Elapsed.Milliseconds;
        Console.WriteLine("Time elapsed: {0}",
            stopwatch.Elapsed);

Ответы

Ответ 1

Это должно быть идеально параллелизуемо. Однако, учитывая небольшой объем работы, выполняемой для каждого элемента, вам нужно будет справиться с этим с особой осторожностью.

Правильный способ сделать это (в .NET 4) - использовать Parallel.ForEach в сочетании с Partitioner:

float b = (float)number / 100.0f;
Parallel.ForEach(Partitioner.Create(0, length), 
(range) =>
{
   for (int i = range.Item1; i < range.Item2; i++)
   {
      image.DataArray[i] = 
          (ushort)(mUIHandler.image1.DataArray[i] + 
          (ushort)(b * (float)mUIHandler.image2.DataArray[i]));
   }
});

Это позволит эффективно разделить работу по доступным ядрам обработки в вашей системе и обеспечить достойное ускорение, если у вас несколько ядер.

Если говорить, это в лучшем случае ускорит эту операцию только по количеству ядер в вашей системе. Если вам нужно ускорить его, вам, скорее всего, придется вернуться к сочетанию распараллеливания и небезопасного кода. В этот момент, возможно, стоит подумать об альтернативах, чтобы попытаться представить это в режиме реального времени.

Ответ 2

Предполагая, что у вас много таких ребят, вы можете попытаться распараллелить операцию (и используете .NET 4):

Parallel.For(0, length, i=>
   {
       image.DataArray[i] = 
      (ushort)(mUIHandler.image1.DataArray[i] + 
      (ushort)(b * (float)mUIHandler.image2.DataArray[i]));
   });

Конечно, все будет зависеть от того, стоит ли это распараллеливать. Это утверждение выглядит довольно коротко вычислительно; доступ к индексам по номеру довольно быстрый, как есть. Вы можете получить прибыль, потому что этот цикл выполняется много раз с таким количеством данных.