Java BlockingQueue не имеет блокировки?
У меня есть блокирующая очередь объектов.
Я хочу написать поток, который блокируется до тех пор, пока в очереди не будет объекта. Аналогично функциональности, предоставляемой BlockingQueue.take().
Однако, поскольку я не знаю, смогу ли я успешно обработать объект, я хочу просто заглянуть() и не удалить объект. Я хочу удалить объект, только если я успешно обработаю его.
Итак, мне нужна функция блокировки peek(). В настоящее время peek() просто возвращает, если очередь пуста в соответствии с javadocs.
Я что-то упустил? Есть ли другой способ достичь этой функциональности?
EDIT:
Любые мысли, если я просто использовал поточную безопасную очередь и заглянул и спал вместо этого?
public void run() {
while (!__exit) {
while (__queue.size() != 0) {
Object o = __queue.peek();
if (o != null) {
if (consume(o) == true) {
__queue.remove();
} else {
Thread.sleep(10000); //need to backoff (60s) and try again
}
}
}
Thread.sleep(1000); //wait 1s for object on queue
}
}
Обратите внимание, что у меня есть только один поток потребителей и один (отдельный) поток производителей. Я думаю, это не так эффективно, как использование BlockingQueue... Любые комментарии оценены.
Ответы
Ответ 1
Вы можете использовать LinkedBlockingDeque и физически удалить элемент из очереди (используя takeLast()
), но снова заменить его на конец очереди, если обработка завершилась неудачей с помощью putLast(E e)
. Между тем ваши "продюсеры" добавят элементы в фронт очереди с помощью putFirst(E e)
.
Вы всегда можете инкапсулировать это поведение в свою собственную реализацию Queue
и предоставить метод blockingPeek()
, который выполняет takeLast()
, за которым следует putLast()
за кулисами на базовом LinkedBlockingDeque
. Следовательно, с точки зрения вызывающего клиента элемент никогда не удаляется из вашей очереди.
Ответ 2
Однако, поскольку я не знаю, смогу ли я успешно обработать объект, я хочу просто заглянуть() и не удалить объект. Я хочу удалить объект, только если я успешно обработаю его.
В общем, он не является потокобезопасным. Что делать, если после peek()
и определить, что объект может быть успешно обработан, но перед тем, как take()
его удалить и обработать, другой поток принимает этот объект?
Ответ 3
Единственное, что я знаю об этом, это BlockingBuffer в Коллекции сообщества Apache:
Если вызов get или remove вызывается пустой буфер, вызывающий поток ожидает уведомления о том, что добавление или Операция addAll завершена.
get()
эквивалентен peek()
, а a Buffer
можно сделать как BlockingQueue
, украсив UnboundedFifoBuffer с a BlockingBuffer
Ответ 4
Не могли бы вы просто добавить очередь прослушивателя событий в свою очередь блокировки, а затем, когда что-то добавлено в (блокирующую) очередь, отправьте событие своим слушателям? Вы могли бы заблокировать поток, пока не будет вызван метод actionPerformed.
Ответ 5
Быстрый ответ заключается в том, что на самом деле нет способа заблокировать просмотр, бар, реализующий блокирующую очередь с блокировкой peek() самостоятельно.
Я что-то пропустил?
peek() может быть неприятным с concurrency -
- Если вы не можете обработать сообщение peek() 'd - оно будет оставлено в очереди, если у вас несколько пользователей.
- Кто собирается получить этот объект из очереди, если вы не можете его обработать?
- Если у вас несколько потребителей, вы получаете условие гонки между вами peek() 'ing и другим потоком также обрабатываете элементы, что приводит к дублированию обработки или хуже.
Похоже, вам может быть лучше удалить элемент и обработать его, используя
Шаблон цепочки ответственности
Изменить: re: ваш последний пример: если у вас есть только один потребитель, вы никогда не избавитесь от объекта в очереди - если он не обновится в среднем времени - в этом случае вам лучше быть очень осторожным безопасность потока и, вероятно, не должны помещать элемент в очередь в любом случае.
Ответ 6
Похоже, что у BlockingQueue нет функции, которую вы указываете.
Возможно, я попытаюсь немного заново создать проблему: что бы вы делали с объектами, которые вы не можете "правильно обработать"? Если вы просто оставите их в очереди, вам придется в какой-то момент вытащить их и разобраться с ними. Я бы рекомендовал либо выяснить, как их обрабатывать (обычно, если queue.get() дает какое-либо недопустимое или плохое значение, вы, вероятно, можете просто отбросить его на пол) или выбрать другую структуру данных, чем FIFO.