Java масштабируется намного хуже, чем С# по многим ядрам?

Я тестирую нерест многих потоков, выполняющих ту же функцию на 32-ядерном сервере для Java и С#. Я запускаю приложение с 1000 итерациями функции, которая загружается через 1,2,4,8, 16 или 32 потока, используя threadpool.

В 1, 2, 4, 8 и 16 параллельных потоках Java по крайней мере в два раза быстрее, чем С#. Однако, по мере увеличения количества потоков, разрыв закрывается, а на 32 потока С# имеет почти такое же среднее время выполнения, но Java иногда занимает 2000 мс (тогда как оба языка обычно работают около 400 мс). Java начинает ухудшаться с массивными всплесками за время, затрачиваемое на итерацию потоков.

EDIT Это Windows Server 2008

EDIT2 Я изменил приведенный ниже код, чтобы показать использование пула потоков службы Executor. Я также установил Java 7.

Я установил следующие оптимизации в горячей точке VM:

-XX: + UseConcMarkSweepGC -Xmx 6000

но он все еще не сделал ничего лучше. Единственное различие между кодом заключается в том, что im использует нижеследующий threadpool и для используемой версии С#:

http://www.codeproject.com/Articles/7933/Smart-Thread-Pool

Есть ли способ сделать Java более оптимизированным? Perhaos, вы могли бы объяснить, почему я вижу это огромное снижение производительности?

Есть ли более эффективный Java threadpool?

(Обратите внимание, что я не имею в виду, изменив тестовую функцию)

import java.io.DataOutputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.PrintStream;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;

public class PoolDemo {

    static long FastestMemory = 2000000;
    static long SlowestMemory = 0;
    static long TotalTime;
    static int[] FileArray;
    static DataOutputStream outs;
    static FileOutputStream fout;
    static Byte myByte = 0;

  public static void main(String[] args) throws InterruptedException, FileNotFoundException {

        int Iterations = Integer.parseInt(args[0]);
        int ThreadSize = Integer.parseInt(args[1]);

        FileArray = new int[Iterations];
        fout = new FileOutputStream("server_testing.csv");

        // fixed pool, unlimited queue
        ExecutorService service = Executors.newFixedThreadPool(ThreadSize);
        ThreadPoolExecutor executor = (ThreadPoolExecutor) service;

        for(int i = 0; i<Iterations; i++) {
          Task t = new Task(i);
          executor.execute(t);
        }

        for(int j=0; j<FileArray.length; j++){
            new PrintStream(fout).println(FileArray[j] + ",");
        }
      }

  private static class Task implements Runnable {

    private int ID;

    public Task(int index) {
      this.ID = index;
    }

    public void run() {
        long Start = System.currentTimeMillis();

        int Size1 = 100000;
        int Size2 = 2 * Size1;
        int Size3 = Size1;

        byte[] list1 = new byte[Size1];
        byte[] list2 = new byte[Size2];
        byte[] list3 = new byte[Size3];

        for(int i=0; i<Size1; i++){
            list1[i] = myByte;
        }

        for (int i = 0; i < Size2; i=i+2)
        {
            list2[i] = myByte;
        }

        for (int i = 0; i < Size3; i++)
        {
            byte temp = list1[i];
            byte temp2 = list2[i];
            list3[i] = temp;
            list2[i] = temp;
            list1[i] = temp2;
        }

        long Finish = System.currentTimeMillis();
        long Duration = Finish - Start;
        TotalTime += Duration;
        FileArray[this.ID] = (int)Duration;
        System.out.println("Individual Time " + this.ID + " \t: " + (Duration) + " ms");


        if(Duration < FastestMemory){
            FastestMemory = Duration;
        }
        if (Duration > SlowestMemory)
        {
            SlowestMemory = Duration;
        }
    }
  }
}

Ответы

Ответ 1

Резюме

Ниже приведены исходный ответ, обновление 1 и обновление 2. В обновлении 1 рассказывается о том, как работать с условиями гонки вокруг тестовых переменных статистики, используя структуры concurrency. Обновление 2 - это гораздо более простой способ справиться с проблемой состояния гонки. Надеюсь, у меня больше нет обновлений - извините за длину ответа, но многопоточное программирование сложно!

Оригинальный ответ

Единственное различие между кодом заключается в том, что im, используя ниже ThreadPool

Я бы сказал, что это абсолютно огромная разница. Сложно сравнивать производительность двух языков, когда их реализации пула потоков являются совершенно разными блоками кода, написанными в пространстве пользователя. Реализация пула потоков может иметь огромное влияние на производительность.

Вам следует рассмотреть возможность использования собственных встроенных пулов потоков Java. См. ThreadPoolExecutor и весь java.util.concurrent пакет, частью которого он является. Класс Executors имеет удобные статические методы factory для пулов и является хорошим интерфейсом более высокого уровня. Все, что вам нужно, это JDK 1.5+, но чем больше, тем лучше. Решения fork/join, упомянутые другими плакатами, также являются частью этого пакета - как уже упоминалось, они требуют 1.7 +.

Обновление 1 - Адресация условий гонки с использованием структур concurrency

У вас есть условия гонки вокруг настроек FastestMemory, SlowestMemory и TotalTime. Для первых двух вы выполняете тестирование < и >, а затем установку более чем на один шаг. Это не атомный; есть вероятность, что другой поток обновит эти значения между тестированием и настройкой. Установка += TotalTime также неатомна: тест и замаскирован.

Вот некоторые предлагаемые исправления.

TotalTime

Цель здесь - поточный, атомный += of TotalTime.

// At the top of everything
import java.util.concurrent.atomic.AtomicLong;  

...    

// In PoolDemo
static AtomicLong TotalTime = new AtomicLong();    

...    

// In Task, where you currently do the TotalTime += piece
TotalTime.addAndGet (Duration); 

FastestMemory/SlowestMemory

Цель здесь - тестировать и обновлять FastestMemory и SlowestMemory каждый на атомном шаге, поэтому нить не может скользить между этапами тестирования и обновления, чтобы вызвать условие гонки.

Самый простой подход:

Защитите тестирование и настройку переменных, используя сам класс в качестве монитора. Нам нужен монитор, который содержит переменные, чтобы гарантировать синхронизированную видимость (спасибо @A.H. для этого.) Мы должны использовать сам класс, потому что все static.

// In Task
synchronized (PoolDemo.class) {
    if (Duration < FastestMemory) {
        FastestMemory = Duration;
    }

    if (Duration > SlowestMemory) {
        SlowestMemory = Duration;
    }
}

Промежуточный подход:

Вам может не понравиться брать весь класс для монитора или подвергать монитор с помощью класса и т.д. Вы можете сделать отдельный монитор, который сам не содержит FastestMemory и SlowestMemory, но затем вы запустите в проблемы видимости синхронизации. Вы обойдете это, используя ключевое слово volatile.

// In PoolDemo
static Integer _monitor = new Integer(1);
static volatile long FastestMemory = 2000000;
static volatile long SlowestMemory = 0;

...

// In Task
synchronized (PoolDemo._monitor) {
    if (Duration < FastestMemory) {
        FastestMemory = Duration;
    }

    if (Duration > SlowestMemory) {
        SlowestMemory = Duration;
    }
}

Расширенный подход:

Здесь мы используем классы java.util.concurrent.atomic вместо мониторов. Под тяжелым утверждением это должно работать лучше, чем подход synchronized. Попробуйте и посмотрите.

// At the top of everything
import java.util.concurrent.atomic.AtomicLong;    

. . . . 

// In PoolDemo
static AtomicLong FastestMemory = new AtomicLong(2000000);
static AtomicLong SlowestMemory = new AtomicLong(0);

. . . . .

// In Task
long temp = FastestMemory.get();       
while (Duration < temp) {
    if (!FastestMemory.compareAndSet (temp, Duration)) {
        temp = FastestMemory.get();       
    }
}

temp = SlowestMemory.get();
while (Duration > temp) {
    if (!SlowestMemory.compareAndSet (temp, Duration)) {
        temp = SlowestMemory.get();
    }
}

Позвольте мне знать, что произойдет после этого. Это может не решить вашу проблему, но условие гонки вокруг самых переменных, отслеживающих вашу производительность, слишком опасно для игнорирования.

Я изначально разместил это обновление в качестве комментария, но перевел его сюда, чтобы у меня было место для показа кода. Это обновление прошло через несколько итераций - благодаря A.H. для обнаружения ошибки, имевшейся у меня в более ранней версии. Все, что содержится в этом обновлении, заменяет что-либо в комментарии.

Наконец, отличный источник, охватывающий весь этот материал, Java concurrency на практике, лучшая книга на Java concurrency, и одна из лучших книг Java в целом.

Обновление 2 - более простое решение условий гонки

Недавно я заметил, что ваш текущий код никогда не завершится, если вы не добавите executorService.shutdown(). То есть, не-демона, живущие в этом пуле, должны быть завершены, иначе основной поток никогда не выйдет. Это заставило меня думать, что, поскольку нам нужно ждать выхода всех потоков, почему бы не сравнить их длительность после их завершения и, таким образом, обойти одновременное обновление FastestMemory и т.д. Вообще? Это проще и может быть быстрее; там больше нет блокировки или накладных расходов CAS, и вы все равно выполняете итерацию FileArray в конце вещей.

Другое, что мы можем использовать, заключается в том, что ваше параллельное обновление FileArray совершенно безопасно, поскольку каждый поток записывается в отдельную ячейку, и поскольку во время его написания нет чтения FileArray.

При этом вы вносите следующие изменения:

// In PoolDemo
// This part is the same, just so you know where we are
for(int i = 0; i<Iterations; i++) {
    Task t = new Task(i);
    executor.execute(t);
}

// CHANGES BEGIN HERE
// Will block till all tasks finish. Required regardless.
executor.shutdown();
executor.awaitTermination(10, TimeUnit.SECONDS);

for(int j=0; j<FileArray.length; j++){
    long duration = FileArray[j];
    TotalTime += duration;

    if (duration < FastestMemory) {
        FastestMemory = duration;
    }

    if (duration > SlowestMemory) {
        SlowestMemory = duration;
    }

    new PrintStream(fout).println(FileArray[j] + ",");
}

. . . 

// In Task
// Ending of Task.run() now looks like this
long Finish = System.currentTimeMillis();
long Duration = Finish - Start;
FileArray[this.ID] = (int)Duration;
System.out.println("Individual Time " + this.ID + " \t: " + (Duration) + " ms");

Дайте этому подходу и выстрел.

Вы должны обязательно проверить свой код на С# для аналогичных условий гонки.

Ответ 2

... но Java иногда принимает 2000 мс...

И

    byte[] list1 = new byte[Size1];
    byte[] list2 = new byte[Size2];
    byte[] list3 = new byte[Size3];

Hickups будет сборщиком мусора, который очистит ваши массивы. Если вы действительно хотите настроить, я предлагаю вам использовать какой-то кеш для массивов.

Edit

Этот

   System.out.println("Individual Time " + this.ID + " \t: " + (Duration) + " ms");

делает один или несколько synchronized внутренне. Таким образом, ваш очень "параллельный" код будет сериализован неплохо на этом этапе. Просто удалите его и повторите проверку.

Ответ 3

В то время как ответ @sparc_spread замечателен, я заметил следующее:

Я запускаю приложение с 1000 итерациями функции

Обратите внимание, что JVM HotSpot работает в режиме интерпретируется для первых 1.5k итераций любой функции в клиентском режиме и для итераций 10k в режиме сервера. Компьютеры с таким количеством ядер автоматически рассматриваются как "серверы" JVM HotSpot.

Это означало бы, что С# будет делать JIT (и запускать машинный код) до того, как будет работать Java, и имеет шанс повысить производительность в среде выполнения функции. Попробуйте увеличить итерации до 20000 и начать подсчет с 10k итераций.

Обоснование здесь состоит в том, что JVM собирает статистические данные о том, как лучше всего использовать JIT. Он полагает, что ваша функция будет работать много времени, поэтому для ускорения работы в целом требуется механизм "медленной начальной загрузки". Или, по их словам, "20% функций выполняются в 80% случаев", так зачем их использовать?

Ответ 5

Вы также можете посмотреть ExecutorService, созданный с помощью Executors.newFixedThreadPool(noOfCores) или аналогичного метода.