Как "отменить" CountDownLatch?

У меня есть несколько потребительских потоков, ожидающих на CountDownLatch размера 1, используя await(). У меня есть единственный продюсерский поток, который вызывает countDown(), когда он успешно завершит.

Это отлично работает, когда ошибок нет.

Однако, если производитель обнаруживает ошибку, я хотел бы, чтобы он мог сигнализировать об ошибке для потребительских потоков. В идеале я мог бы заставить продюсера назвать что-то вроде abortCountDown() и заставить всех потребителей получить InterruptedException или какое-то другое исключение. Я не хочу называть countDown(), потому что для этого требуется, чтобы все мои потребительские потоки выполняли дополнительную ручную проверку на успех после их вызова await(). Я бы предпочел, чтобы они просто получили исключение, которое они уже знают, как обращаться.

Я знаю, что функция прерывания недоступна в CountDownLatch. Есть ли другой примитив синхронизации, который я могу легко адаптировать для эффективного создания CountDownLatch, который поддерживает отмену обратного отсчета?

Ответы

Ответ 1

JB Nizet получил отличный ответ. Я взял его и немного отполировал. Результатом является подкласс CountDownLatch с именем AbortableCountDownLatch, который добавляет к классу метод "abort()", который заставит все потоки, ожидающие на защелке, получить исключение AbortException (подкласс InterruptedException).

Кроме того, в отличие от класса JB, AbortableCountDownLatch будет прервать все блокирующие потоки немедленно при прерывании, а не ждать, пока обратный отсчет достигнет нуля (для ситуаций, когда вы используете счетчик > 1).

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

public class AbortableCountDownLatch extends CountDownLatch {
    protected boolean aborted = false;

    public AbortableCountDownLatch(int count) {
        super(count);
    }


   /**
     * Unblocks all threads waiting on this latch and cause them to receive an
     * AbortedException.  If the latch has already counted all the way down,
     * this method does nothing.
     */
    public void abort() {
        if( getCount()==0 )
            return;

        this.aborted = true;
        while(getCount()>0)
            countDown();
    }


    @Override
    public boolean await(long timeout, TimeUnit unit) throws InterruptedException {
        final boolean rtrn = super.await(timeout,unit);
        if (aborted)
            throw new AbortedException();
        return rtrn;
    }

    @Override
    public void await() throws InterruptedException {
        super.await();
        if (aborted)
            throw new AbortedException();
    }


    public static class AbortedException extends InterruptedException {
        public AbortedException() {
        }

        public AbortedException(String detailMessage) {
            super(detailMessage);
        }
    }
}

Ответ 2

Инкапсулируйте это поведение внутри определенного класса более высокого уровня, используя встроенную функцию CountDownLatch:

public class MyLatch {
    private CountDownLatch latch;
    private boolean aborted;
    ...

    // called by consumers
    public void await() throws AbortedException {
        latch.await();
        if (aborted) {
            throw new AbortedException();
        }
    }

    // called by producer
    public void abort() {
        this.aborted = true;
        latch.countDown();
    }

    // called by producer
    public void succeed() {
        latch.countDown();
    }
}

Ответ 3

Вы можете создать обертку вокруг CountDownLatch, которая предоставляет возможность отменить официантов. Он должен будет отслеживать ожидающие потоки и отпускать их при тайм-ауте, а также помнить, что защелка была отменена, поэтому будущие вызовы await будут прерываться немедленно.

public class CancellableCountDownLatch
{
    final CountDownLatch latch;
    final List<Thread> waiters;
    boolean cancelled = false;

    public CancellableCountDownLatch(int count) {
        latch = new CountDownLatch(count);
        waiters = new ArrayList<Thread>();
    }

    public void await() throws InterruptedException {
        try {
            addWaiter();
            latch.await();
        }
        finally {
            removeWaiter();
        }
    }

    public boolean await(long timeout, TimeUnit unit) throws InterruptedException {
        try {
            addWaiter();
            return latch.await(timeout, unit);
        }
        finally {
            removeWaiter();
        }
    }

    private synchronized void addWaiter() throws InterruptedException {
        if (cancelled) {
            Thread.currentThread().interrupt();
            throw new InterruptedException("Latch has already been cancelled");
        }
        waiters.add(Thread.currentThread());
    }

    private synchronized void removeWaiter() {
        waiters.remove(Thread.currentThread());
    }

    public void countDown() {
        latch.countDown();
    }

    public synchronized void cancel() {
        if (!cancelled) {
            cancelled = true;
            for (Thread waiter : waiters) {
                waiter.interrupt();
            }
            waiters.clear();
        }
    }

    public long getCount() {
        return latch.getCount();
    }

    @Override
    public String toString() {
        return latch.toString();
    }
}

Ответ 4

Вы можете свернуть свой CountDownLatch с помощью ReentrantLock, который позволяет получить доступ к его защищенному getWaitingThreads методу.

Пример:

public class FailableCountDownLatch {
    private static class ConditionReentrantLock extends ReentrantLock {
        private static final long serialVersionUID = 2974195457854549498L;

        @Override
        public Collection<Thread> getWaitingThreads(Condition c) {
            return super.getWaitingThreads(c);
        }
    }

    private final ConditionReentrantLock lock = new ConditionReentrantLock();
    private final Condition countIsZero = lock.newCondition();
    private long count;

    public FailableCountDownLatch(long count) {
        this.count = count;
    }

    public void await() throws InterruptedException {
        lock.lock();
        try {
            if (getCount() > 0) {
                countIsZero.await();
            }
        } finally {
            lock.unlock();
        }
    }

    public boolean await(long time, TimeUnit unit) throws InterruptedException {
        lock.lock();
        try {
            if (getCount() > 0) {
                return countIsZero.await(time, unit);
            }
        } finally {
            lock.unlock();
        }
        return true;
    }

    public long getCount() {
        lock.lock();
        try {
            return count;
        } finally {
            lock.unlock();
        }
    }

    public void countDown() {
        lock.lock();
        try {
            if (count > 0) {
                count--;

                if (count == 0) {
                    countIsZero.signalAll();
                }
            }
        } finally {
            lock.unlock();
        }
    }

    public void abortCountDown() {
        lock.lock();
        try {
            for (Thread t : lock.getWaitingThreads(countIsZero)) {
                t.interrupt();
            }
        } finally {
            lock.unlock();
        }
    }
}

Вы можете захотеть изменить этот класс, чтобы сбросить InterruptedException при новых вызовах на await после его отмены. Вы могли бы даже расширить этот класс CountDownLatch, если вам нужна эта функциональность.