BlockingQueue - заблокированные методы дренажа()
В BlockingQueue есть метод, называемый drainTo(), но он не заблокирован. Мне нужна очередь, которую я хочу заблокировать, но также способную извлекать объекты в очереди одним способом.
Object first = blockingQueue.take();
if ( blockingQueue.size() > 0 )
blockingQueue.drainTo( list );
Я думаю, что вышеприведенный код будет работать, но я ищу элегантное решение.
Ответы
Ответ 1
Вы ссылаетесь на комментарий в JavaDoc:
Далее, поведение этой операции undefined, если указанная коллекция изменяется во время выполнения операции.
Я считаю, что это относится к коллекции list
в вашем примере:
blockingQueue.drainTo(list);
означает, что вы не можете изменить list
в то же время, когда вы сбрасываете с blockingQueue
в list
. Тем не менее, блокирующая очередь внутренне синхронизируется так, что, когда вызывается drainTo
, блокировка puts и (см. Примечание ниже). Если бы это не было сделано, тогда это было бы не по-настоящему безопасным для Thread. Вы можете посмотреть исходный код и убедиться, что drainTo
является потокобезопасным относительно самой блокирующей очереди.
В качестве альтернативы, вы имеете в виду, когда вы вызываете drainTo
, который хотите заблокировать, пока в очередь не будет добавлен хотя бы один объект? В этом случае у вас есть другой выбор, кроме:
list.add(blockingQueue.take());
blockingQueue.drainTo(list);
чтобы заблокировать до тех пор, пока не будет добавлен один или несколько элементов, а затем слейте всю очередь в коллекцию list
.
Примечание. Что касается Java 7, для get и puts используется отдельная блокировка. Операции ввода теперь разрешены во время дренажа (и нескольких других операций).
Ответ 2
Если вы используете Google Guava, есть отличный Queues.drain()
метод.
Сбрасывает очередь как BlockingQueue.drainTo(Collection, int)
, но если запрошенные элементы numElements
недоступны, он будет ждать до указанного таймаута.
Ответ 3
Я нашел этот шаблон полезным.
List<byte[]> blobs = new ArrayList<byte[]>();
if (queue.drainTo(blobs, batch) == 0) {
blobs.add(queue.take());
}
Ответ 4
Имея доступный API, я не думаю, что вы собираетесь стать более элегантным. Кроме того, вы можете удалить тест размера.
Если вы хотите атомарно извлекать непрерывную последовательность элементов, даже если другая операция удаления совпадает, я не верю, что даже drainTo
гарантирует это.
Ответ 5
Исходный код:
596: public int drainTo(Collection<? super E> c) {
//arg. check
603: lock.lock();
604: try {
608: for (n = 0 ; n != count ; n++) {
609: c.add(items[n]);
613: }
614: if (n > 0) {
618: notFull.signalAll();
619: }
620: return n;
621: } finally {
622: lock.unlock();
623: }
624: }
ArrayBlockingQueue хочет вернуть 0. Кстати, он мог сделать это, прежде чем принимать блокировку.