Тупик в ThreadPoolExecutor

Обнаружена ситуация, когда ThreadPoolExecutor припаркован в функции execute(Runnable), тогда как все потоки ThreadPool ждут в getTask func, workQueue пуст.

Есть ли у кого-нибудь идеи?

ThreadPoolExecutor создается с помощью ArrayBlockingQueue и corePoolSize == maximumPoolSize = 4

[Edit] Чтобы быть более точным, поток блокируется в ThreadPoolExecutor.exec(Runnable command) func. Он выполняет задачу, но не делает этого.

[Edit2] Исполнитель заблокирован где-то внутри рабочей очереди (ArrayBlockingQueue).

[Edit3] Столбец:

thread = front_end(224)
at sun.misc.Unsafe.park(Native methord)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:158)
at
java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:747)
at
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:778)
at
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1114)
at
java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:186)
at java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:262)
at java.util.concurrent.ArrayBlockingQueue.offer(ArrayBlockingQueue.java:224)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:653)
at net.listenThread.WorkersPool.execute(WorkersPool.java:45)

в то же время workQueue пуст (проверен с помощью удаленного отладки)

[Edit4] Код работает с ThreadPoolExecutor:

public WorkersPool(int size) {
  pool = new ThreadPoolExecutor(size, size, IDLE_WORKER_THREAD_TIMEOUT, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(WORK_QUEUE_CAPACITY),
      new ThreadFactory() {
        @NotNull
        private final AtomicInteger threadsCount = new AtomicInteger(0);

        @NotNull
        public Thread newThread(@NotNull Runnable r) {
          final Thread thread = new Thread(r);
          thread.setName("net_worker_" + threadsCount.incrementAndGet());
          return thread;
        }
      },

      new RejectedExecutionHandler() {
        public void rejectedExecution(@Nullable Runnable r, @Nullable ThreadPoolExecutor executor) {
          Verify.warning("new task " + r + " is discarded");
        }
      });
  }

  public void execute(@NotNull Runnable task) {
    pool.execute(task);
  }

  public void stopWorkers() throws WorkersTerminationFailedException {
    pool.shutdownNow();
    try {
      pool.awaitTermination(THREAD_TERMINATION_WAIT_TIME, TimeUnit.SECONDS);
    } catch (InterruptedException e) {
      throw new WorkersTerminationFailedException("Workers-pool termination failed", e);
    }
  }
}

Ответы

Ответ 1

Похоже, что это ошибка с JVM старше 6u21. В скомпилированном исходном коде возникла проблема для некоторых (возможно, всех) ОС.

Из ссылки:

Ошибка вызвана отсутствием барьеров памяти в различных Parker:: park() пути, которые могут привести к потерянному пробуждению и зависанию. (Обратите внимание, что PlatformEvent:: парк, используемый встроенной синхронизацией, не уязвим к проблеме). -XX: + UseMembar создает обход, потому что барьер мембар в логике перехода состояния скрывает проблему в Паркер::. (то есть нет ничего плохого в использовании -UseMembar механизм, но + UseMembar скрывает ошибку Parker::). Это день-один ошибка с добавлением java.util.concurrent в JDK 5.0. Я разработал простой режим C отказа, и он кажется более вероятным проявляется на современных платформах AMD и Nehalem, вероятно, из-за более глубоких хранить буферы, которые занимают больше времени для слива. Я предусмотрел предварительное исправление для Doug Lea для Parker:: park, который, кажется, устраняет ошибку. Больной доставить это исправление во время выполнения. (Я также увеличу CR с помощью дополнительные тестовые примеры и более подробное объяснение). Вероятно, это хороший кандидат на задние порты.

Ссылка: Ошибка JVM

Обходные пути доступны, но вам, вероятно, будет лучше всего получить самую последнюю копию Java.

Ответ 2

Я не вижу блокировки в коде ThreadPoolExecutor execute(Runnable). Единственной переменной является workQueue. Какой тип BlockingQueue вы предоставили вашему ThreadPoolExecutor?

По теме взаимоблокировок:

Вы можете подтвердить, что это тупик, рассматривая Дамп полного потока, как это предусмотрено <ctrl><break> в Windows или kill -QUIT в системах UNIX.

После того, как у вас есть эти данные, вы можете изучить потоки. Вот уместная выдержка из статья Sun о рассмотрении дампов потоков (предлагаемое чтение):

Для зависания, заторможенных или замороженных программ: если вы считаете, что ваша программа висит, создайте трассировку стека и проверьте потоки в состояниях MW или CW. Если программа зашла в тупик, то некоторые из системных потоков, вероятно, будут отображаться как текущие потоки, потому что для JVM ничего не остается.

В более светлой заметке: если вы работаете в среде IDE, можете ли вы гарантировать, что в этих методах не будет точек останова.

Ответ 3

Этот тупик, вероятно, потому, что вы запускаете задачу из самого исполнителя. Например, вы отправляете одну задачу, и она запускает еще 4 задачи. Если у вас размер пула равен 4, то вы просто полностью переполняете его, а последняя задача будет ждать, пока кто-то из задач вернет значение. Но первая задача ждет завершения всех разветвленных задач.

Ответ 4

Как уже упоминалось, это похоже на нормальное поведение, ThreadPoolExecutor просто ждет, чтобы сделать какую-то работу. Если вы хотите остановить его, вам необходимо позвонить:

executor.shutdown()

чтобы завершить его, как правило, после него исполнитель.

Ответ 5

Источник кода библиотеки ниже (это на самом деле класс из http://spymemcached.googlecode.com/files/memcached-2.4.2-sources.zip),
- немного сложная - добавлена ​​защита от повторных вызовов FutureTask, если я не ошибаюсь - но не похоже на тупик - очень простое использование ThreadPool:

package net.spy.memcached.transcoders;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;

import net.spy.memcached.CachedData;
import net.spy.memcached.compat.SpyObject;

/**
 * Asynchronous transcoder.
 */
public class TranscodeService extends SpyObject {

    private final ThreadPoolExecutor pool = new ThreadPoolExecutor(1, 10, 60L,
            TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(100),
            new ThreadPoolExecutor.DiscardPolicy());

    /**
     * Perform a decode.
     */
    public <T> Future<T> decode(final Transcoder<T> tc,
            final CachedData cachedData) {

        assert !pool.isShutdown() : "Pool has already shut down.";

        TranscodeService.Task<T> task = new TranscodeService.Task<T>(
                new Callable<T>() {
                    public T call() {
                        return tc.decode(cachedData);
                    }
                });

        if (tc.asyncDecode(cachedData)) {
            this.pool.execute(task);
        }
        return task;
    }

    /**
     * Shut down the pool.
     */
    public void shutdown() {
        pool.shutdown();
    }

    /**
     * Ask whether this service has been shut down.
     */
    public boolean isShutdown() {
        return pool.isShutdown();
    }

    private static class Task<T> extends FutureTask<T> {
        private final AtomicBoolean isRunning = new AtomicBoolean(false);

        public Task(Callable<T> callable) {
            super(callable);
        }

        @Override
        public T get() throws InterruptedException, ExecutionException {
            this.run();
            return super.get();
        }

        @Override
        public T get(long timeout, TimeUnit unit) throws InterruptedException,
                ExecutionException, TimeoutException {
            this.run();
            return super.get(timeout, unit);
        }

        @Override
        public void run() {
            if (this.isRunning.compareAndSet(false, true)) {
                super.run();
            }
        }
    }

}

Ответ 6

Определенно странно.

Но прежде чем писать собственную TPE, попробуйте:

  • другой BlockingQueue impl., например. LinkedBlockingQueue

  • укажите справедливость = true в ArrayBlockingQueue, т.е. используйте new ArrayBlockingQueue(n, true)

Из этих двух вариантов я выбрал бы второй, потому что очень странно, что offer() блокируется; одна причина, которая приходит в голову - политика планирования потоков на вашем Linux. Как предположение.