Параллельная и блокирующая очередь в Java
У меня есть классическая проблема потоковой передачи событий во входящую очередь второго потока. Только в этот раз меня очень интересует производительность. Я хочу достичь:
- Я хочу одновременный доступ к очереди, нажатие на устройство, приемник.
- Когда очередь пуста, я хочу, чтобы потребитель блокировал очередь, ожидая производителя.
Моя первая идея заключалась в использовании LinkedBlockingQueue
, но вскоре я понял, что это не одновременно, а производительность. С другой стороны, теперь я использую ConcurrentLinkedQueue
, но все же я оплачиваю стоимость wait()
/notify()
для каждой публикации. Поскольку потребитель, обнаружив пустую очередь, не блокируется, мне нужно синхронизировать и wait()
по блокировке. С другой стороны, продюсер должен получить этот замок и notify()
в каждой отдельной публикации. В целом, я оплачиваю
sycnhronized (lock) {lock.notify()}
в каждой отдельной публикации, даже если это не требуется.
Я думаю, что это необходимо, это очередь, которая является блокирующей и параллельной. Я полагаю, что операция push()
работает как в ConcurrentLinkedQueue
, с дополнительным notify()
для объекта, когда нажатый элемент является первым в списке. Такая проверка, которую я считаю уже существующей в ConcurrentLinkedQueue
, поскольку нажатие требует соединения со следующим элементом. Таким образом, это будет намного быстрее, чем синхронизация каждый раз на внешней блокировке.
Что-то вроде этого доступно/разумно?
Ответы
Ответ 1
Я думаю, вы можете придерживаться java.util.concurrent.LinkedBlockingQueue
независимо от ваших сомнений. Это одновременно. Хотя, я понятия не имею о его производительности. Возможно, другая реализация BlockingQueue
подойдет вам лучше. Их не так уж много, поэтому выполните тесты производительности и измерения.
Ответ 2
Подобно этому ответу fooobar.com/questions/196865/..., но немного по-другому.. Я закончил использование ExecutorService
. Вы можете создать экземпляр с помощью Executors.newSingleThreadExecutor()
. Мне нужна параллельная очередь для чтения/записи BufferedImages в файлы, а также атомарности с чтением и записью. Мне нужен только один поток, потому что файл IO на порядок быстрее, чем источник, net IO. Кроме того, меня больше беспокоило атомарность действий и правильность, чем производительность, но этот подход также может быть выполнен с несколькими потоками в пуле, чтобы ускорить работу.
Чтобы получить изображение (Try-Catch-finally опущено):
Future<BufferedImage> futureImage = executorService.submit(new Callable<BufferedImage>() {
@Override
public BufferedImage call() throws Exception {
ImageInputStream is = new FileImageInputStream(file);
return ImageIO.read(is);
}
})
image = futureImage.get();
Чтобы сохранить изображение (Try-Catch-finally опущено):
Future<Boolean> futureWrite = executorService.submit(new Callable<Boolean>() {
@Override
public Boolean call() {
FileOutputStream os = new FileOutputStream(file);
return ImageIO.write(image, getFileFormat(), os);
}
});
boolean wasWritten = futureWrite.get();
Важно отметить, что вы должны очистить и закрыть свои потоки в блоке finally. Я не знаю, как он работает по сравнению с другими решениями, но он довольно универсален.
Ответ 3
Я бы предложил вам посмотреть ThreadPoolExecutor newSingleThreadExecutor. Он будет обрабатывать ваши задачи, заказанные для вас, и если вы отправите Callables своему исполнителю, вы сможете получить поведение блокировки, которое вы ищете также.
Ответ 4
Вы можете попробовать LinkedTransferQueue из jsr166: http://gee.cs.oswego.edu/cgi-bin/viewcvs.cgi/jsr166/src/jsr166y/
Он удовлетворяет вашим требованиям и имеет меньшие накладные расходы для операций предложения/опроса.
Как видно из кода, когда очередь не пуста, она использует атомарные операции для элементов опроса. И когда очередь пуста, она вращается в течение некоторого времени и припарковывает поток, если не удастся.
Я думаю, что это может помочь в вашем случае.
Ответ 5
Я использую ArrayBlockingQueue всякий раз, когда мне нужно передавать данные из одного потока в другой. Использование методов put и take (которые будут блокироваться, если они полны/пусты).
Ответ 6
Ниже приведен список классов, реализующих BlockingQueue
.
Я бы рекомендовал проверить SynchronousQueue
.
Подобно @Rorick, упомянутому в его комментарии, я считаю, что все эти реализации являются параллельными. Я думаю, что ваши проблемы с LinkedBlockingQueue
могут быть неуместными.