Могу ли я использовать поведение воровства ForkJoinPool, чтобы избежать тупика с головоломкой?

Тупик с головоломкой, связанный с потоком, встречается в обычном пуле потоков, если все потоки в пуле ждут завершения задач в очереди в одном пуле. ForkJoinPool избегает этой проблемы, воруя работу из других потоков из вызова join(), а не просто ждать. Например:

private static class ForkableTask extends RecursiveTask<Integer> {
    private final CyclicBarrier barrier;

    ForkableTask(CyclicBarrier barrier) {
        this.barrier = barrier;
    }

    @Override
    protected Integer compute() {
        try {
            barrier.await();
            return 1;
        } catch (InterruptedException | BrokenBarrierException e) {
            throw new RuntimeException(e);
        }
    }
}

@Test
public void testForkJoinPool() throws Exception {
    final int parallelism = 4;
    final ForkJoinPool pool = new ForkJoinPool(parallelism);
    final CyclicBarrier barrier = new CyclicBarrier(parallelism);

    final List<ForkableTask> forkableTasks = new ArrayList<>(parallelism);
    for (int i = 0; i < parallelism; ++i) {
        forkableTasks.add(new ForkableTask(barrier));
    }

    int result = pool.invoke(new RecursiveTask<Integer>() {
        @Override
        protected Integer compute() {
            for (ForkableTask task : forkableTasks) {
                task.fork();
            }

            int result = 0;
            for (ForkableTask task : forkableTasks) {
                result += task.join();
            }
            return result;
        }
    });
    assertThat(result, equalTo(parallelism));
}

Но при использовании интерфейса ExecutorService для ForkJoinPool, кража работы, похоже, не возникает. Например:

private static class CallableTask implements Callable<Integer> {
    private final CyclicBarrier barrier;

    CallableTask(CyclicBarrier barrier) {
        this.barrier = barrier;
    }

    @Override
    public Integer call() throws Exception {
        barrier.await();
        return 1;
    }
}

@Test
public void testWorkStealing() throws Exception {
    final int parallelism = 4;
    final ExecutorService pool = new ForkJoinPool(parallelism);
    final CyclicBarrier barrier = new CyclicBarrier(parallelism);

    final List<CallableTask> callableTasks = Collections.nCopies(parallelism, new CallableTask(barrier));
    int result = pool.submit(new Callable<Integer>() {
        @Override
        public Integer call() throws Exception {
            int result = 0;
            // Deadlock in invokeAll(), rather than stealing work
            for (Future<Integer> future : pool.invokeAll(callableTasks)) {
                result += future.get();
            }
            return result;
        }
    }).get();
    assertThat(result, equalTo(parallelism));
}

Из беглого взгляда на реализацию ForkJoinPool все обычные API-интерфейсы ExecutorService реализованы с использованием ForkJoinTask s, поэтому я не знаю, почему возникает взаимоблокировка.

Ответы

Ответ 1

Вы почти отвечаете на свой вопрос. Решением является утверждение, что " ForkJoinPool избегает этой проблемы, крадя работу с другими потоками изнутри вызова join() ". Всякий раз, когда потоки блокируются по какой-либо другой причине, кроме ForkJoinPool.join(), это кража работы не происходит, и потоки просто ждут и ничего не делают.

Причина этого в том, что в Java невозможно, чтобы ForkJoinPool предотвращал блокирование потоков и вместо этого предоставлял им что-то еще для работы. Сам поток должен избегать блокировки и вместо этого попросить пул работать, он должен это делать. И это реализовано только в ForkJoinTask.join(), а не в любом другом методе блокировки. Если вы используете Future в ForkJoinPool, вы также увидите тупик с голодом.

Почему кража работы выполняется только в ForkJoinTask.join() а не в каких-либо других методах блокировки в Java API? Ну, есть много таких методов блокировки (Object.wait(), Future.get(), любой из примитивов параллелизма в java.util.concurrent, методы ввода-вывода и т.д.), И они не имеют ничего общего с ForkJoinPool, который является просто произвольным классом в API, поэтому добавление особых случаев ко всем этим методам было бы плохим дизайном. Это также привело бы к возможно очень неожиданным и нежелательным последствиям. Представьте себе, например, пользователь, передающий задачу в ExecutorService, ожидающий в Future, а затем выяснение, что задача очень долго висит в Future.get() только потому, что Future.get() поток украл какой-то другой (долговременный) рабочий элемент вместо ожидая Future и продолжая сразу же после того, как результат будет доступен. Как только поток начинает работать над другой задачей, он не может вернуться к исходной задаче до завершения второй задачи. Таким образом, на самом деле хорошо, что другие методы блокировки не выполняют кражу работы. Для ForkJoinTask эта проблема не существует, потому что не важно, чтобы основная задача была продолжена как можно скорее, важно только, чтобы все задачи вместе обрабатывались максимально эффективно.

Также невозможно реализовать свой собственный метод для кражи работы внутри ForkJoinPool, потому что все соответствующие части не являются общедоступными.

Тем не менее, на самом деле существует второй метод, позволяющий предотвратить блокировку голода. Это называется управляемой блокировкой. Он не использует кражу работы (чтобы избежать проблемы, упомянутой выше), но также нуждается в потоке, который будет блокировать активное сотрудничество с пулом потоков. При управляемой блокировке поток сообщает пулу потоков, что он может быть заблокирован до того, как он вызовет метод потенциальной блокировки, а также проинформирует пул о завершении метода блокировки. Затем пул потоков знает, что существует опасность блокировки голода и может появляться дополнительные потоки, если все его потоки в настоящее время находятся в какой-то операции блокировки, и все еще есть другие задачи для выполнения. Обратите внимание, что это менее эффективно, чем кража работы, из-за накладных расходов дополнительных потоков. Если вы реализуете рекурсивный параллельный алгоритм с обычными фьючерсами и управляемой блокировкой вместо ForkJoinTask и кражи работы, количество дополнительных потоков может стать очень большим (потому что на фазе "разделить" алгоритм будет создано множество задач и для потоков, которые немедленно блокируют и ждут результатов от подзадач). Тем не менее, блокировка голода по-прежнему предотвращается, и это позволяет избежать проблемы, что задача должна ждать долгое время, потому что ее поток начал работать над другой задачей в среднем.

ForkJoinPool of Java также поддерживает управляемую блокировку. Чтобы использовать это, необходимо реализовать интерфейс ForkJoinPool.ManagedBlocker таким образом, чтобы потенциально-блокирующий метод, который хочет выполнить задача, вызывается из block метода этого интерфейса. Тогда задача может не вызвать метод блокировки напрямую, но вместо этого нужно вызвать статический метод ForkJoinPool.managedBlock(ManagedBlocker). Этот метод обрабатывает связь с пулом потоков до и после блокировки. Он также работает, если текущая задача не выполняется в ForkJoinPool, тогда она просто вызывает метод блокировки.

Единственное место, которое я нашел в Java API (для Java 7), которое фактически использует управляемую блокировку, - это класс Phaser. (Этот класс является барьером синхронизации, таким как мьютексы и защелки, но более гибким и мощным.) Таким образом, синхронизация с Phaser внутри задачи ForkJoinPool должна использовать управляемую блокировку и может избежать блокировок от голода (но ForkJoinTask.join() по-прежнему предпочтительнее, поскольку он использует кража работы вместо управляемой блокировки). Это работает независимо от того, используете ли вы ForkJoinPool напрямую или через интерфейс ExecutorService. Тем не менее, это не будет работать, если вы используете какой - либо другой ExecutorService, как те, которые создаются с помощью класса Executors, потому что они не поддерживают управляемую блокировку.

В Scala использование управляемой блокировки более широко распространено (описание, T):T rel=noreferrer>API).

Ответ 2

Я вижу, что вы делаете, но я не знаю, почему. Идея барьера настолько независима, что может ждать друг друга, чтобы достичь общей точки. У вас нет независимых потоков. Пулы потоков, F/J, предназначены для параллелизма данных

Вы делаете что-то более настроенное для параллелизма задач

Причина продолжения F/J заключается в том, что фреймворк создает "продолжения потоков", чтобы продолжить извлечение работы из deques, когда ожидают все рабочие потоки.