Какова самая быстрая циклическая синхронизация в Java (ExecutorService против CyclicBarrier vs. X)?

Какая конструкция синхронизации Java, вероятно, обеспечит наилучшую производительности для параллельного, итеративного сценария обработки с фиксированное количество потоков, как указано ниже? После эксперимента (я использую ExecutorService и CyclicBarrier) и будучи несколько удивлен результатами, я был бы признателен за некоторые экспертные советы и, возможно, некоторые новые идеи. Существующие вопросы здесь по-видимому, не фокусируются прежде всего на производительности, а следовательно, и на новой. Заранее спасибо!

Ядром приложения является простой итеративный алгоритм обработки данных, распараллеливается с распределением вычислительной нагрузки через 8 ядер на Mac Pro, работающий под управлением ОС X 10.6 и Java 1.6.0_07. Данные, подлежащие обработке разделяется на 8 блоков, и каждый блок подается в Runnable для выполнения одним фиксированным числом потоков. Распараллеливание алгоритма довольно прост, и он функционально работает по желанию, но его производительность еще не так, как я думаю. Приложение кажется тратить много времени на синхронизацию системных вызовов, поэтому после некоторых Профилирование Интересно, выбрал ли я наиболее подходящий синхронизационный механизм (ы).

Ключевым требованием алгоритма является то, что он должен действовать в этапов, поэтому потоки должны синхронизироваться в конце каждого этапа. Основная нить подготавливает работу (очень низкие накладные расходы), передает ее потоки, позволяет им работать над ним, а затем продолжается, когда все потоки выполняются, переупорядочивают работу (опять-таки очень низкие накладные расходы) и повторяются цикл. Машина посвящена этой задаче, сбор мусора сводится к минимуму с помощью пулов с пулами выделенных заранее ресурсов и количество потоков может быть исправлено (нет входящих запросов или тому подобного, всего один поток на процессорное ядро).

V1 - ExecutorService

Моя первая реализация использовала ExecutorService с 8 работниками потоки. Программа создает 8 задач, выполняющих работу, а затем позволяет им работать над этим, примерно так:

// create one thread per CPU
executorService = Executors.newFixedThreadPool( 8 );
...
// now process data in cycles
while( ...) {
    // package data into 8 work items
    ...

    // create one Callable task per work item
    ...

    // submit the Callables to the worker threads
    executorService.invokeAll( taskList );
}

Это работает нормально (оно делает то, что должно), и для очень большие рабочие элементы, действительно, все 8 процессоров становятся очень загруженными, поскольку так как ожидается, что алгоритм обработки позволит (некоторые рабочие элементы будут заканчиваться быстрее других, а затем простаивать). Однако, поскольку рабочие предметы становятся меньше (и это не совсем управление программой), загрузка процессора пользователя резко сокращается:

blocksize | system | user | cycles/sec
256k        1.8%    85%     1.30
64k         2.5%    77%     5.6
16k         4%      64%     22.5
4096        8%      56%     86
1024       13%      38%     227
256        17%      19%     420
64         19%      17%     948
16         19%      13%     1626

Легенда:  - размер блока = размер рабочего элемента (= расчетные этапы)  - system = загрузка системы, как показано в OS X Activity Monitor (красная полоса)  - user = пользовательская загрузка, как показано в OS X Activity Monitor (зеленая панель)  - Циклы/сек = итерации через основной цикл while, лучше -

Основная проблема, вызывающая озабоченность, заключается в высоком проценте времени в системе, которая, по-видимому, управляется синхронизацией потоков звонки. Как и ожидалось, для небольших рабочих элементов ExecutorService.invokeAll() потребует относительно больших усилий для синхронизации потоков по сравнению с объемом работы, выполняемой в каждом потоке. Но поскольку ExecutorService является более общим, чем это необходимо для этого случая использования (он может ставить задачи для потоков, если есть больше задач, чем ядер), хотя, возможно, будет более компактный конструкция синхронизации.

V2 - CyclicBarrier

Следующая реализация использовала CyclicBarrier для синхронизации нити перед получением работы и после ее завершения, примерно следующим образом:

main() {
    // create the barrier
    barrier = new CyclicBarrier( 8 + 1 );

    // create Runable for thread, tell it about the barrier
    Runnable task = new WorkerThreadRunnable( barrier );

    // start the threads
    for( int i = 0; i < 8; i++ )
    {
        // create one thread per core
        new Thread( task ).start();
    }

    while( ... ) {
        // tell threads about the work
        ...

        // N threads + this will call await(), then system proceeds
        barrier.await();

        // ... now worker threads work on the work...

        // wait for worker threads to finish
        barrier.await();
    }
}

class WorkerThreadRunnable implements Runnable {
    CyclicBarrier barrier;

    WorkerThreadRunnable( CyclicBarrier barrier ) { this.barrier = barrier; }

    public void run()
    {
        while( true )
        {
            // wait for work
            barrier.await();

            // do the work
            ...

            // wait for everyone else to finish
            barrier.await();
        }
    }
}

Опять же, это работает хорошо функционально (он делает то, что должен), и для очень больших рабочих элементов действительно все 8 процессоров становятся высоко как и раньше. Однако, поскольку рабочие элементы становятся меньше, нагрузка по-прежнему резко сокращается:

blocksize | system | user | cycles/sec
256k        1.9%     85%    1.30
64k         2.7%     78%    6.1
16k         5.5%     52%    25
4096        9%       29%    64
1024       11%       15%    117
256        12%        8%    169
64         12%        6.5%  285
16         12%        6%    377

Для больших рабочих элементов синхронизация пренебрежимо мала, и производительность идентична V1. Но неожиданно результаты (узкоспециализированный) CyclicBarrier кажутся MUCH WORSE, чем для общего (общего) ExecutorService: пропускная способность (циклы/сек) составляет только 1/4-го от V1. Предварительный вывод что, хотя это, кажется, рекламируемое идеальное использование для CyclicBarrier, он выполняет намного хуже, чем generic ExecutorService.

V3 - Wait/Notify + CyclicBarrier

Кажется, стоит попробовать заменить первый циклический барьер() с простым механизмом ожидания/уведомления:

main() {
    // create the barrier
    // create Runable for thread, tell it about the barrier
    // start the threads

    while( ... ) {
        // tell threads about the work
        // for each: workerThreadRunnable.setWorkItem( ... );

        // ... now worker threads work on the work...

        // wait for worker threads to finish
        barrier.await();
    }
}

class WorkerThreadRunnable implements Runnable {
    CyclicBarrier barrier;
    @NotNull volatile private Callable<Integer> workItem;

    WorkerThreadRunnable( CyclicBarrier barrier ) { this.barrier = barrier; this.workItem = NO_WORK; }

    final protected void
    setWorkItem( @NotNull final Callable<Integer> callable )
    {
        synchronized( this )
        {
            workItem = callable;
            notify();
        }
    }

    public void run()
    {
        while( true )
        {
            // wait for work
            while( true )
            {
                synchronized( this )
                {
                    if( workItem != NO_WORK ) break;

                    try
                    {
                        wait();
                    }
                    catch( InterruptedException e ) { e.printStackTrace(); }
                }
            }

            // do the work
            ...

            // wait for everyone else to finish
            barrier.await();
        }
    }
}

Опять же, это работает хорошо функционально (он делает то, что должен).

blocksize | system | user | cycles/sec
256k        1.9%     85%    1.30
64k         2.4%     80%    6.3
16k         4.6%     60%    30.1
4096        8.6%     41%    98.5
1024       12%       23%    202
256        14%       11.6%  299
64         14%       10.0%  518
16         14.8%      8.7%  679

Пропускная способность для небольших рабочих элементов по-прежнему намного хуже, чем службы ExecutorService, но примерно в 2 раза от CyclicBarrier. Устранение одного CyclicBarrier устраняет половину зазора.

V4 - ожидание ожидания вместо ожидания/уведомления

Поскольку это приложение является основным, работающим в системе, и в любом случае, ядра не работают, если они не заняты рабочим элементом, почему бы не попробовать занятый ожидание рабочих элементов в каждом потоке, даже если который вращает процессор без необходимости. Изменен код рабочего потока следующим образом:

class WorkerThreadRunnable implements Runnable {
    // as before

    final protected void
    setWorkItem( @NotNull final Callable<Integer> callable )
    {
        workItem = callable;
    }

    public void run()
    {
        while( true )
        {
            // busy-wait for work
            while( true )
            {
                if( workItem != NO_WORK ) break;
            }

            // do the work
            ...

            // wait for everyone else to finish
            barrier.await();
        }
    }
}

Также работает хорошо функционально (он делает то, что должен).

blocksize | system | user | cycles/sec
256k        1.9%     85%    1.30
64k         2.2%     81%    6.3
16k         4.2%     62%     33
4096        7.5%     40%    107
1024       10.4%     23%    210
256        12.0%    12.0%   310
64         11.9%    10.2%   550
16         12.2%     8.6%   741

Для небольших рабочих элементов это увеличивает пропускную способность 10% по сравнению с вариантом CyclicBarrier + wait/notify, который не является незначительный. Но он по-прежнему намного ниже, чем V1 с ExecutorService.

V5 -?

Итак, каков наилучший механизм синхронизации для такого (предположительно не редкость) проблема? Я устал писать свои собственный механизм синхронизации для полной замены ExecutorService (предполагая, что он слишком общий, и должно быть что-то которые все еще можно извлечь, чтобы сделать его более эффективным). Это не моя область знаний, и я обеспокоен тем, что тратить много времени на отладку (поскольку я даже не уверен мои ожидания/уведомления и ожидание вариантов верны) для неопределенный выигрыш.

Приветствуются любые советы.

Ответы

Ответ 1

Кажется, что вам не нужна синхронизация между рабочими. Возможно, вам стоит рассмотреть возможность использования инфраструктуры ForkJoin, доступной на Java 7, а также отдельной библиотеки. Некоторые ссылки:

Ответ 2

Обновление: V6 - Занят Ожидание, при этом основной поток также работает

Очевидное улучшение на V5 (ожидание ожидания работы в 7 рабочих потоках, занятое ожидание завершения в основном потоке), казалось, снова разделило работу на 7 + 1 части и позволило основному потоку обрабатывать одну часть одновременно с другой рабочие потоки (а не просто ожидание), а затем - ожидание - до завершения всех рабочих элементов других потоков. Это будет использовать 8-й процессор (в примере 8-ядерная конфигурация) и добавить его циклы в доступный пул ресурсов вычислений.

Это было действительно прямо для реализации. И результаты действительно снова немного лучше:

blocksize | system | user | cycles/sec
256k        1.0%     98%       1.39
64k         1.0%     98%       6.8
16k         1.0%     98%      50.4
4096        1.0%     98%     372
1024        1.0%     98%    1317
256         1.0%     98%    3546
64          1.5%     98%    9091
16          2.0%     98%   16949

Таким образом, это покажется лучшим решением.

Ответ 3

Обновление: V5 - Занят Ожидание во всех потоках (пока кажется оптимальным)

Поскольку все ядра посвящены этой задаче, казалось, стоит попытаться просто устранить все сложные конструкции синхронизации и сделать ожидание в каждой точке синхронизации во всех потоках. Это, как оказалось, превосходит все другие подходы.

Настройка выполняется следующим образом: начните с V4 выше (CyclicBarrier + Busy Wait). Замените CyclicBarrier на AtomicInteger, чтобы основной поток сбрасывался до нуля каждого цикла. Каждый рабочий поток Runnable, который завершает свою работу, увеличивает атомное целое на единицу. Ожидается основной поток:

while( true ) {
    // busy-wait for threads to complete their work
    if( atomicInt.get() >= workerThreadCount ) break;
}

Вместо 8 запускается только 7 рабочих потоков (поскольку все потоки, включая основной поток, теперь полностью загружают ядро). Результаты следующие:

blocksize | system | user | cycles/sec
256k        1.0%     98%       1.36
64k         1.0%     98%       6.8
16k         1.0%     98%      44.6
4096        1.0%     98%     354
1024        1.0%     98%    1189
256         1.0%     98%    3222
64          1.5%     98%    8333
16          2.0%     98%   16129

Использование wait/notify в рабочих потоках снижает пропускную способность примерно до 1/3 этого решения.

Ответ 4

Мне также интересно, можете ли вы попробовать более 8 потоков. Если ваш процессор поддерживает HyperThreading, то (по крайней мере теоретически) вы можете сжать 2 потока на ядро ​​и посмотреть, что из этого выйдет.

Ответ 5

Обновление: V7 - Занят Ожидание, которое возвращается в Wait/Notify

После некоторого разговора с V6 выясняется, что занятый ждет, пока не профилируются реальные горячие точки приложения. Кроме того, вентилятор системы постоянно переходит в овердрайв, даже если рабочие элементы не обрабатываются. Таким образом, дальнейшее улучшение состояло в том, чтобы ожидание рабочих элементов в течение фиксированного периода времени (скажем, около 2 миллисекунд), а затем для возврата к комбинации "более приятного" ожидания()/notify(). Рабочие потоки просто публикуют свой текущий режим ожидания в основной поток через атомное логическое значение, указывающее, заняты ли они ожиданиями (и, следовательно, просто нужен рабочий элемент, который нужно установить), или ожидают ли они вызова для уведомления(), поскольку они находятся в ждать().

Другим усовершенствованием, которое оказалось довольно прямолинейным, было позволить потокам, которые завершили свой основной рабочий элемент, повторно вызывать обратный вызов, предоставленный клиентом, в то время как они ожидают, что другие потоки завершат свои основные рабочие элементы. Таким образом, время ожидания (которое происходит из-за того, что потоки связаны с немного разными рабочими нагрузками) не обязательно должно быть полностью потеряно для приложения.

Мне все еще очень интересно слышать от других пользователей, которые столкнулись с аналогичным вариантом использования.

Ответ 6

Просто нажмите на эту тему, и хотя это почти год назад, позвольте мне указать вам на библиотеку "jbarrier", которую мы разработали в Боннском университете несколько месяцев назад:

http://net.cs.uni-bonn.de/wg/cs/applications/jbarrier/

Пакет барьеров нацелен на случай, когда число рабочих потоков равно <= количество ядер. Пакет основан на оживленном ожидании, он поддерживает не только барьерные действия, но и глобальные сокращения, и помимо центрального барьера он предлагает древовидные барьеры для параллелизации деталей синхронизации/восстановления еще больше.