ExecutorService: как предотвратить голодание, когда в потоках выполняются барьеры синхронизации

У меня есть ситуация, для которой у меня возникают проблемы, чтобы найти чистое решение. Я попытаюсь объяснить как можно подробнее.

У меня есть древовидная структура:

NODE A
    NODE A.1
    NODE A.2
        NODE A.2.a
        NODE A.2.b
    NODE A.3
        NODE A.3.a
        NODE A.3.b
        NODE A.3.c
NODE B
    NODE B.1
    NODE B.2

Мне нужно обработать корень node:

 public void process(final Node node) { ... }

Процесс node включает в себя две вещи:

- some database queries
- the process of all children of these nodes

Другими словами, после обработки NODE.2.a и NODE.2.b обрабатывается NODE.2. Я обрабатываю узлы рекурсивным образом, ничего впечатляющего.

До сих пор так хорошо. Теперь я хочу объявить глобальную службу исполнителей с фиксированным количеством потоков. Я хочу обрабатывать дочерние узлы node параллельно. Таким образом, NODE.2.a и NODE.2.b могут обрабатываться каждый в своих потоках. Код будет выглядеть примерно так:

// global executor service, shared between all process(Node) calls
final ExecutorService service = Executors.newFixedThreadPool(4);

public void process(final Node node) {
    // database queries
    ...

    // wait for children to be processed
    final CountDownLatch latch = new CountDownLatch(node.children().size());

    for (final Node child : node.children()) {
        service.execute(() -> {
             process(child);
             latch.countDown();
        });
    }
    latch.await();
}

Проблема здесь: при достижении определенной глубины все потоки останавливаются в latch.await(). Мы достигли ситуации голодного голода.

Это можно легко решить, сделав службу исполнителя неограниченной, но мне не нравится этот параметр. Я хочу контролировать количество потоков active. В моем случае количество потоков active будет равно числу ядер. Наличие более active потоков приведет к свопам из одного потока в другой, и я хотел бы избежать этого.

Как это можно решить?

Ответы

Ответ 1

Это технически тупик. Сначала мы думаем о тупике, когда два или более процесса ожидают блокировки, но в этом случае потоки ждут друг друга. Интересно, что это способ создать странного зверя самозахвата. Если у вас есть пул из одного потока, он будет отправлять задания и ждать их, но никогда не будет выполняться, потому что он ждет от себя, чтобы закончить их выполнение!

Стандартный ответ называется "Запуск пула потоков работ". Я столкнулся с одной и той же проблемой, и без этой части словарного запаса требуется возраст, чтобы найти какую-либо информацию о том, что я быстро разработал, была общей проблемой в любом рекурсивном алгоритме, который выполняется в полностью параллельной рекурсии.

ОК, это эскиз того, как он работает. Создайте довольно стандартный пул потоков с помощью одного трюка. Когда поток достигает точки, в которой он не может действовать без результатов очереди, проверьте, был ли этот элемент запущен другим потоком, и если он не выполнил его в текущем потоке (а не ожидал), в противном случае подождите поток, выполняющий этот элемент, чтобы завершить его.

Это очень просто, но может потребовать, чтобы вы создали свой собственный пул потоков, потому что готовые решения часто его не поддерживают.

Остерегайтесь, конечно, что типичный нетривиальный рекурсивный алгоритм будет разбит на многие другие подзадачи, чем у вас есть параллельные процессоры. Таким образом, имеет смысл вставлять подзадачи на некоторый уровень, а затем просто выполнять остаток в одном потоке.

То есть, если вы не ставите в очередь и выставляете предметы, вы можете потратить время на то, чтобы положить предметы в очередь, чтобы вернуть их. Эти операции могут быть сериализованы между потоками и уменьшить parallelism. Трудно использовать блокировки без очереди, потому что они обычно не позволяют извлечь из середины очереди (как требуется здесь). Вы должны ограничить, насколько глубоко вы проходите параллельно.

С положительной стороны обратите внимание, что выполнение задачи в текущем исполняемом потоке связано с меньшими затратами на замену задач, чем на парковку текущего потока и (на уровне ОС) замену другого рабочего.

Вот ресурс Java, который предоставляет WorkStealingThreadPool. Пожалуйста, обратите внимание, что я не предпринял никаких усилий для оценки реализации и не предлагаю никаких рекомендаций. Я работал в С++ в этом случае или с удовольствием поделился бы мной template.

https://zeroturnaround.com/rebellabs/fixedthreadpool-cachedthreadpool-or-forkjoinpool-picking-correct-java-executors-for-background-tasks/

Также см. то, что Wikipedia должно сказать: https://en.wikipedia.org/wiki/Work_stealing

Ответ 2

Я бы не использовал многочисленные CountDownLatch-es, подверженные ошибкам. Насколько я понимаю, вы хотите пересекать дерево из узлов листьев в родительские узлы параллельно.

Я бы пересекал дерево, используя простой BFS/DFS (вам не нужен рекурсивный алгоритм там), как только лист найден без дочерних узлов, поместите это node в блокировку очереди.

Очередь может быть опрошена вторым потоком, этот поток назначит новую задачу службе-исполнителю с фиксированным числом потоков.

Как только исполнитель-поток завершает обработку текущего node, взятого из очереди, поток-исполнитель проверяет, имеет ли текущий node родительские узлы. Если есть родительский node, поместите его в очередь снова. Также вы должны проверить, был ли родитель уже обработан.

Что-то вроде этого:

    BlockingQueue blockingQueue = new BlockingQueue();
    ExecutorService service = 

    new Thread(new Runnable(){
        while (true) {
            Task task = blockingQueue.poll();
            service.execute(new Runnable(){
                if (task.isProcessed())  {
                   return ; 
                }
                ... do you job
                task.setProcessed(true);
                Node node = task.getNode();


                boolean allChildrenProcessed = true;
                for (Node childeNode: node.getChildren()) {
                    allChildrenProcessed &= childeNode.isProcessed();
                }

                if (node.hasParent() && allChildrenProcessed) {
                    blockingQueue.put(node.getParent())
                }

            });
        }
    }).start();

    Stack stack = new StackImpl();

    stack.put(root);
    while (node = stack.pop() != null) {
        for (Node child: node.getChildren()) {
            stack.push(child);
        }
        if (node.getChildren().isEmpty()) {
            // add leaf node for processing
            blockinQueue.add(node);
        }
    }

Ответ 3

Кажется, что вам нужна альтернатива рекурсии. Эта альтернатива - сбор рассеяния. Отправьте запрос, который открывает все задачи. Когда выполняется NODE.2, он просто возвращается, поскольку дочерние узлы не завершены. Когда последняя задача завершается (2.1, 2.b), обработка завершений начинает обрабатывать NODE.2.

Я поддерживаю продукт Parallel с открытым исходным кодом, который может сделать именно это. Вместо того, чтобы извлекать весь продукт, просто загрузите документацию (http://coopsoft.com/JavaDoc.html) и посмотрите файл: Manual/DynamicJoin.html. Если это решает вашу проблему, вы получаете продукт.

Ответ 4

То, что вы действительно хотите, может быть достигнуто с помощью

int cores = Runtime.getRuntime().availableProcessors();
ExecutorService executorService = Executors.newFixedThreadPool(cores);

Помните также, что javadoc getActiveCount() говорит, что это приблизительное число.