Java BlockingQueue с дозированием?
Меня интересует структура данных, идентичная Java BlockingQueue, за исключением того, что она должна иметь возможность пакетных объектов в очереди. Другими словами, я хотел бы, чтобы продюсер мог помещать объекты в очередь, но имеет потребительский блок на take()
, пока очередь не достигнет определенного размера (размер партии).
Затем, как только очередь достигнет размера партии, производитель должен заблокировать на put()
до тех пор, пока потребитель не будет потреблять все элементы в очереди (в этом случае производитель начнет производить снова и потребительский блок до тех пор, пока пакет снова достигнут).
Существует ли аналогичная структура данных? Или я должен писать (что я не против), я просто не хочу тратить свое время, если там что-то есть.
UPDATE
Возможно, немного прояснить ситуацию:
Ситуация всегда будет следующей. В очереди может быть несколько продюсеров, добавляющих элементы, но никогда не будет более одного потребителя, берущего элементы из очереди.
Теперь проблема заключается в том, что несколько этих установок работают параллельно и последовательно. Другими словами, производители производят товары для нескольких очередей, в то время как потребители сами по себе могут также быть производителями. Это легче воспринимать как ориентированный график производителей, потребителей-производителей и, наконец, потребителей.
Причина, по которой производители должны блокировать до тех пор, пока очереди не будут пустыми (@Peter Lawrey), потому что каждый из них будет работать в потоке. Если вы оставите их просто создавать по мере того, как пространство станет доступным, вы столкнетесь с ситуацией, когда у вас слишком много потоков, которые пытаются обрабатывать слишком много вещей одновременно.
Может быть, связать это с службой исполнения может решить проблему?
Ответы
Ответ 1
Я предлагаю вам использовать BlockingQueue.drainTo(Collection, int). Вы можете использовать его с помощью take(), чтобы получить минимальное количество элементов.
Преимущество использования этого подхода заключается в том, что ваш размер партии динамически растет с рабочей нагрузкой, и продюсер не должен блокировать, когда потребитель занят. то есть он сам оптимизируется для латентности и пропускной способности.
Чтобы реализовать точно так, как было задано (что я думаю, это плохая идея), вы можете использовать SynchronousQueue с занятым потоком потребления.
то есть. потребляющая нить выполняет
list.clear();
while(list.size() < required) list.add(queue.take());
// process list.
Производитель блокирует, когда потребитель будет занят.
Ответ 2
Вот быстрая (= простая, но не полностью протестированная) реализация, которая, по моему мнению, может быть подходящей для ваших запросов - вы должны иметь возможность расширять ее, чтобы поддерживать полный интерфейс очереди, если вам нужно.
чтобы увеличить производительность, вы можете переключиться на ReentrantLock вместо использования "синхронизированного" ключевого слова..
public class BatchBlockingQueue<T> {
private ArrayList<T> queue;
private Semaphore readerLock;
private Semaphore writerLock;
private int batchSize;
public BatchBlockingQueue(int batchSize) {
this.queue = new ArrayList<>(batchSize);
this.readerLock = new Semaphore(0);
this.writerLock = new Semaphore(batchSize);
this.batchSize = batchSize;
}
public synchronized void put(T e) throws InterruptedException {
writerLock.acquire();
queue.add(e);
if (queue.size() == batchSize) {
readerLock.release(batchSize);
}
}
public synchronized T poll() throws InterruptedException {
readerLock.acquire();
T ret = queue.remove(0);
if (queue.isEmpty()) {
writerLock.release(batchSize);
}
return ret;
}
}
Надеюсь, вы сочтете это полезным.
Ответ 3
Не то, что я знаю. Если я правильно понял, вы хотите, чтобы продюсер работал (пока потребитель блокирован), пока он не заполнит очередь или потребитель для работы (пока производитель блокирует), пока он не очистит очередь. Если это случай, я могу предположить, что вам не нужна структура данных, а механизм блокировки одной стороны, в то время как другой работает в режиме mutex fasion. Вы можете заблокировать объект для этого и внутренне иметь логику полной или пустой, чтобы освободить блокировку и передать ее другой стороне. Короче говоря, вы должны написать сами:)
Ответ 4
Это звучит так, как RingBuffer работает в шаблоне LMAX Disruptor. Подробнее см. http://code.google.com/p/disruptor/.
Очень грубое объяснение - ваша основная структура данных - RingBuffer. Производители помещают данные в кольцевой буфер в последовательности, и потребители могут извлекать столько данных, сколько производитель вложил в буфер (таким образом, по сути, пакетная обработка). Если буфер заполнен, производитель блокирует до тех пор, пока потребитель не закончит и не освободит слоты в буфере.