"Закрытие" блокирующей очереди
Im, используя java.util.concurrent.BlockingQueue в очень простом сценарии производителя и потребителя. Например. этот псевдо-код изображает потребительскую часть:
class QueueConsumer implements Runnable {
@Override
public void run() {
while(true)
{
try {
ComplexObject complexObject = myBlockingQueue.take();
//do something with the complex object
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
}
Пока все хорошо. В javadoc блокирующей очереди я читал:
A BlockingQueue не является внутренне поддерживать любые "близкие" или "выключение", чтобы указать, что больше предметов не будет добавлено. Нужды и использование таких функций, как правило, зависит от реализации. Например, общая тактика заключается в том, что производители вставить специальный конец потока или яд объекты, которые интерпретируются соответственно, когда их принимают потребители.
К сожалению, из-за использования дженериков и характера ComplexObject его нетривиально, чтобы вставить "ядовитый объект" в очередь. Так что эта "общая тактика" в моем сценарии не очень удобна.
Мой вопрос: какие еще хорошие тактики/шаблоны я могу использовать для "закрытия" очереди?
Спасибо!
Ответы
Ответ 1
Если у вас есть дескриптор потребительского потока, вы можете его прервать. С кодом, который вы дали, это убьет потребителя. Я бы не ожидал, что производитель получит это; ему, вероятно, придется обратный вызов программному контроллеру, чтобы сообщить ему об этом. Затем контроллер будет прерывать потребительский поток.
Вы всегда можете закончить работу перед тем, как послушать прерывание. Например:
class QueueConsumer implements Runnable {
@Override
public void run() {
while(!(Thread.currentThread().isInterrupted())) {
try {
final ComplexObject complexObject = myBlockingQueue.take();
this.process(complexObject);
} catch (InterruptedException e) {
// Set interrupted flag.
Thread.currentThread().interrupt();
}
}
// Thread is getting ready to die, but first,
// drain remaining elements on the queue and process them.
final LinkedList<ComplexObject> remainingObjects;
myBlockingQueue.drainTo(remainingObjects);
for(ComplexObject complexObject : remainingObjects) {
this.process(complexObject);
}
}
private void process(final ComplexObject complexObject) {
// Do something with the complex object.
}
}
Я бы предпочел, чтобы так или иначе отравить очередь. Если вы хотите убить поток, попросите нить убить себя.
(Хорошо видеть, что кто-то правильно управляет InterruptedException
.)
Кажется, что есть некоторые сомнения относительно обработки прерываний здесь. Во-первых, я хотел бы, чтобы все прочитали эту статью: http://www.ibm.com/developerworks/java/library/j-jtp05236.html
Теперь, понимая, что никто на самом деле не читал это, вот сделка. Поток получает только InterruptedException
, если он в настоящий момент блокируется во время прерывания. В этом случае Thread.interrupted()
вернет false
. Если он не блокирует, он не получит это исключение, а вместо Thread.interrupted()
вернется true
. Таким образом, ваш защитник цикла должен абсолютно, независимо от того, проверьте Thread.interrupted()
или иным образом не потеряйте прерывание потока.
Итак, поскольку вы проверяете Thread.interrupted()
независимо от того, что вы, и вы вынуждены ловить InterruptedException
(и должны иметь дело с ним, даже если вы не были вынуждены), теперь у вас есть две области кода, которые обрабатывают одно и то же событие, прерывание потока. Один из способов справиться с этим - нормализовать их в одно условие, то есть либо проверка булевого состояния может вызывать исключение, либо исключение может устанавливать логическое состояние. Я выбираю позже.
Изменить: Обратите внимание, что статический метод прерывания Thread # очищает прерванный статус текущего потока.
Ответ 2
Альтернативой было бы обернуть обработку, которую вы делаете, с ExecutorService
, и пусть сам ExecutorService
определяет, добавляются ли задания в очередь.
В принципе, вы используете ExecutorService.shutdown()
, который при вызове запрещает выполнение каких-либо задач исполнителем.
Я не уверен, как вы в настоящее время отправляете задачи в QueueConsumer
в своем примере. Я предположил, что у вас есть какой-то метод submit()
, и в этом примере использовался аналогичный метод.
import java.util.concurrent.*;
class QueueConsumer {
private final ExecutorService executor = Executors.newSingleThreadExecutor();
public void shutdown() {
executor.shutdown(); // gracefully shuts down the executor
}
// 'Result' is a class you'll have to write yourself, if you want.
// If you don't need to provide a result, you can just Runnable
// instead of Callable.
public Future<Result> submit(final ComplexObject complexObject) {
if(executor.isShutdown()) {
// handle submitted tasks after the executor has been told to shutdown
}
return executor.submit(new Callable<Result>() {
@Override
public Result call() {
return process(complexObject);
}
});
}
private Result process(final ComplexObject complexObject) {
// Do something with the complex object.
}
}
Этот пример представляет собой только ту же иллюстрацию, что предлагает пакет java.util.concurrent
; вероятно, есть некоторые оптимизации, которые могут быть сделаны для него (например, QueueConsumer
, поскольку его собственный класс, вероятно, даже не нужен, вы можете просто предоставить ExecutorService
всем производителям, отправляющим задания).
Выполните пакет java.util.concurrent
(начиная с некоторых ссылок выше). Вы можете обнаружить, что это дает вам множество отличных вариантов того, что вы пытаетесь сделать, и вам даже не нужно беспокоиться о регулировании рабочей очереди.
Ответ 3
Другая идея сделать это просто:
class ComplexObject implements QueueableComplexObject
{
/* the meat of your complex object is here as before, just need to
* add the following line and the "implements" clause above
*/
@Override public ComplexObject asComplexObject() { return this; }
}
enum NullComplexObject implements QueueableComplexObject
{
INSTANCE;
@Override public ComplexObject asComplexObject() { return null; }
}
interface QueueableComplexObject
{
public ComplexObject asComplexObject();
}
Затем используйте BlockingQueue<QueueableComplexObject>
в качестве очереди. Если вы хотите завершить обработку очереди, сделайте queue.offer(NullComplexObject.INSTANCE)
. Со стороны потребителя сделайте
boolean ok = true;
while (ok)
{
ComplexObject obj = queue.take().asComplexObject();
if (obj == null)
ok = false;
else
process(obj);
}
/* interrupt handling elided: implement this as you see fit,
* depending on whether you watch to swallow interrupts or propagate them
* as in your original post
*/
Нет instanceof
, и вам не нужно создавать поддельные ComplexObject
, которые могут быть дорогими/сложными в зависимости от его реализации.
Ответ 4
Другая возможность создания ядовитого объекта: сделайте его конкретным экземпляром класса. Таким образом, вам не нужно обманывать подтипы или испортить общий характер.
Недостаток: это не сработает, если между производителем и потребителем есть какой-то барьер сериализации.
public class ComplexObject
{
public static final POISON_INSTANCE = new ComplexObject();
public ComplexObject(whatever arguments) {
}
// Empty constructor for creating poison instance.
private ComplexObject() {
}
}
class QueueConsumer implements Runnable {
@Override
public void run() {
while(!(Thread.currentThread().interrupted())) {
try {
final ComplexObject complexObject = myBlockingQueue.take();
if (complexObject == ComplexObject.POISON_INSTANCE)
return;
// Process complex object.
} catch (InterruptedException e) {
// Set interrupted flag.
Thread.currentThread().interrupt();
}
}
}
}
Ответ 5
Возможно ли расширить объект ComplexObject и выкрикнуть нетривиальную функциональность создания? По сути, у вас заканчивается объект оболочки, но вы можете сделать, затем instance of
, чтобы увидеть, является ли конец объекта очереди.
Ответ 6
Вы можете обернуть ваш общий объект в объект данных. В этом объекте данных вы можете добавить дополнительные данные, такие как статус отравляющего объекта. Dataobject - это класс с двумя полями. T complexObject;
и boolean poison;
.
Ваш потребитель берет объекты данных из очереди. Если возвращен ядовитый объект, вы закрываете его, иначе вы разворачиваете общий и вызываете "процесс (complexObject)".
Я использую java.util.concurrent.LinkedBlockingDeque<E>
, чтобы вы могли добавить объект в конце очереди и перенести его с фронта. Таким образом, ваш объект будет обработан по порядку, но более важно безопасно закрыть очередь после запуска в ядовитый объект.
Чтобы поддерживать несколько потребителей, я добавляю объект яда обратно в очередь, когда я нахожу его.
public final class Data<T> {
private boolean poison = false;
private T complexObject;
public Data() {
this.poison = true;
}
public Data(T complexObject) {
this.complexObject = complexObject;
}
public boolean isPoison() {
return poison;
}
public T getComplexObject() {
return complexObject;
}
}
public class Consumer <T> implements Runnable {
@Override
public final void run() {
Data<T> data;
try {
while (!(data = queue.takeFirst()).isPoison()) {
process(data.getComplexObject());
}
} catch (final InterruptedException e) {
Thread.currentThread().interrupt();
return;
}
// add the poison object back so other consumers can stop too.
queue.addLast(line);
}
}
Ответ 7
В этой ситуации вам, как правило, приходится отказываться от дженериков и делать объект хранения в очереди. то вам просто нужно проверить свой "ядовитый объект" перед тем, как придать его фактическому типу.
Ответ 8
Мне кажется разумным реализовать тесный BlockingQueue
:
import java.util.concurrent.BlockingQueue;
public interface CloseableBlockingQueue<E> extends BlockingQueue<E> {
/** Returns <tt>true</tt> if this queue is closed, <tt>false</tt> otherwise. */
public boolean isClosed();
/** Closes this queue; elements cannot be added to a closed queue. **/
public void close();
}
Было бы довольно просто реализовать это со следующими формами поведения (см. сводная таблица методов):
-
Вставить
-
Выдает исключение, Специальное значение:
Ведет себя как полная Queue
, ответственность вызывающего абонента для тестирования isClosed()
.
-
Блокировки:
Выбрасывает IllegalStateException
, если и когда закрыт.
-
Тайм-аут:
Возвращает false
, если и когда закрыто, ответственность вызывающего абонента для проверки isClosed()
.
-
Удалить
-
Выдает исключение, Специальное значение:
Ведет себя как пустая Queue
, ответственность вызывающего абонента для тестирования isClosed()
.
-
Блокировки:
Выбрасывает NoSuchElementException
, если и при закрытии.
-
Тайм-аут:
Возвращает null
, если и когда закрыто, ответственность вызывающего абонента для проверки isClosed()
.
-
Исследовать
Без изменений.
Я сделал это, отредактировав источник, найдя его на github.com.
Ответ 9
Я использовал эту систему:
ConsumerClass
private boolean queueIsNotEmpty = true;//with setter
...
do {
...
sharedQueue.drainTo(docs);
...
} while (queueIsNotEmpty || sharedQueue.isEmpty());
Когда заканчивается производитель, я установил в поле consumerObject, queueIsNotEmpty значение false
Ответ 10
Сегодня я решил эту проблему, используя объект-оболочку. Поскольку ComplexObject слишком сложный для подкласса, я завернул объект ComplexObject в объект ComplexObjectWrapper. Затем используется ComplexObjectWrapper как общий тип.
public class ComplexObjectWrapper {
ComplexObject obj;
}
public class EndOfQueue extends ComplexObjectWrapper{}
Теперь вместо BlockingQueue<ComplexObject>
я сделал BlockingQueue<ComplexObjectWrapper>
Поскольку у меня был контроль над Потребителем и Продюсером, это решение сработало для меня.