Как "отменить" CountDownLatch?
У меня есть несколько потребительских потоков, ожидающих на CountDownLatch
размера 1, используя await()
. У меня есть единственный продюсерский поток, который вызывает countDown()
, когда он успешно завершит.
Это отлично работает, когда ошибок нет.
Однако, если производитель обнаруживает ошибку, я хотел бы, чтобы он мог сигнализировать об ошибке для потребительских потоков. В идеале я мог бы заставить продюсера назвать что-то вроде abortCountDown()
и заставить всех потребителей получить InterruptedException или какое-то другое исключение. Я не хочу называть countDown()
, потому что для этого требуется, чтобы все мои потребительские потоки выполняли дополнительную ручную проверку на успех после их вызова await()
. Я бы предпочел, чтобы они просто получили исключение, которое они уже знают, как обращаться.
Я знаю, что функция прерывания недоступна в CountDownLatch
. Есть ли другой примитив синхронизации, который я могу легко адаптировать для эффективного создания CountDownLatch
, который поддерживает отмену обратного отсчета?
Ответы
Ответ 1
JB Nizet получил отличный ответ. Я взял его и немного отполировал. Результатом является подкласс CountDownLatch с именем AbortableCountDownLatch, который добавляет к классу метод "abort()", который заставит все потоки, ожидающие на защелке, получить исключение AbortException (подкласс InterruptedException).
Кроме того, в отличие от класса JB, AbortableCountDownLatch будет прервать все блокирующие потоки немедленно при прерывании, а не ждать, пока обратный отсчет достигнет нуля (для ситуаций, когда вы используете счетчик > 1).
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
public class AbortableCountDownLatch extends CountDownLatch {
protected boolean aborted = false;
public AbortableCountDownLatch(int count) {
super(count);
}
/**
* Unblocks all threads waiting on this latch and cause them to receive an
* AbortedException. If the latch has already counted all the way down,
* this method does nothing.
*/
public void abort() {
if( getCount()==0 )
return;
this.aborted = true;
while(getCount()>0)
countDown();
}
@Override
public boolean await(long timeout, TimeUnit unit) throws InterruptedException {
final boolean rtrn = super.await(timeout,unit);
if (aborted)
throw new AbortedException();
return rtrn;
}
@Override
public void await() throws InterruptedException {
super.await();
if (aborted)
throw new AbortedException();
}
public static class AbortedException extends InterruptedException {
public AbortedException() {
}
public AbortedException(String detailMessage) {
super(detailMessage);
}
}
}
Ответ 2
Инкапсулируйте это поведение внутри определенного класса более высокого уровня, используя встроенную функцию CountDownLatch:
public class MyLatch {
private CountDownLatch latch;
private boolean aborted;
...
// called by consumers
public void await() throws AbortedException {
latch.await();
if (aborted) {
throw new AbortedException();
}
}
// called by producer
public void abort() {
this.aborted = true;
latch.countDown();
}
// called by producer
public void succeed() {
latch.countDown();
}
}
Ответ 3
Вы можете создать обертку вокруг CountDownLatch
, которая предоставляет возможность отменить официантов. Он должен будет отслеживать ожидающие потоки и отпускать их при тайм-ауте, а также помнить, что защелка была отменена, поэтому будущие вызовы await
будут прерываться немедленно.
public class CancellableCountDownLatch
{
final CountDownLatch latch;
final List<Thread> waiters;
boolean cancelled = false;
public CancellableCountDownLatch(int count) {
latch = new CountDownLatch(count);
waiters = new ArrayList<Thread>();
}
public void await() throws InterruptedException {
try {
addWaiter();
latch.await();
}
finally {
removeWaiter();
}
}
public boolean await(long timeout, TimeUnit unit) throws InterruptedException {
try {
addWaiter();
return latch.await(timeout, unit);
}
finally {
removeWaiter();
}
}
private synchronized void addWaiter() throws InterruptedException {
if (cancelled) {
Thread.currentThread().interrupt();
throw new InterruptedException("Latch has already been cancelled");
}
waiters.add(Thread.currentThread());
}
private synchronized void removeWaiter() {
waiters.remove(Thread.currentThread());
}
public void countDown() {
latch.countDown();
}
public synchronized void cancel() {
if (!cancelled) {
cancelled = true;
for (Thread waiter : waiters) {
waiter.interrupt();
}
waiters.clear();
}
}
public long getCount() {
return latch.getCount();
}
@Override
public String toString() {
return latch.toString();
}
}
Ответ 4
Вы можете свернуть свой CountDownLatch
с помощью ReentrantLock
, который позволяет получить доступ к его защищенному getWaitingThreads
методу.
Пример:
public class FailableCountDownLatch {
private static class ConditionReentrantLock extends ReentrantLock {
private static final long serialVersionUID = 2974195457854549498L;
@Override
public Collection<Thread> getWaitingThreads(Condition c) {
return super.getWaitingThreads(c);
}
}
private final ConditionReentrantLock lock = new ConditionReentrantLock();
private final Condition countIsZero = lock.newCondition();
private long count;
public FailableCountDownLatch(long count) {
this.count = count;
}
public void await() throws InterruptedException {
lock.lock();
try {
if (getCount() > 0) {
countIsZero.await();
}
} finally {
lock.unlock();
}
}
public boolean await(long time, TimeUnit unit) throws InterruptedException {
lock.lock();
try {
if (getCount() > 0) {
return countIsZero.await(time, unit);
}
} finally {
lock.unlock();
}
return true;
}
public long getCount() {
lock.lock();
try {
return count;
} finally {
lock.unlock();
}
}
public void countDown() {
lock.lock();
try {
if (count > 0) {
count--;
if (count == 0) {
countIsZero.signalAll();
}
}
} finally {
lock.unlock();
}
}
public void abortCountDown() {
lock.lock();
try {
for (Thread t : lock.getWaitingThreads(countIsZero)) {
t.interrupt();
}
} finally {
lock.unlock();
}
}
}
Вы можете захотеть изменить этот класс, чтобы сбросить InterruptedException
при новых вызовах на await
после его отмены. Вы могли бы даже расширить этот класс CountDownLatch
, если вам нужна эта функциональность.