Java BlockingQueue с дозированием?

Меня интересует структура данных, идентичная Java BlockingQueue, за исключением того, что она должна иметь возможность пакетных объектов в очереди. Другими словами, я хотел бы, чтобы продюсер мог помещать объекты в очередь, но имеет потребительский блок на take(), пока очередь не достигнет определенного размера (размер партии).

Затем, как только очередь достигнет размера партии, производитель должен заблокировать на put() до тех пор, пока потребитель не будет потреблять все элементы в очереди (в этом случае производитель начнет производить снова и потребительский блок до тех пор, пока пакет снова достигнут).

Существует ли аналогичная структура данных? Или я должен писать (что я не против), я просто не хочу тратить свое время, если там что-то есть.


UPDATE

Возможно, немного прояснить ситуацию:

Ситуация всегда будет следующей. В очереди может быть несколько продюсеров, добавляющих элементы, но никогда не будет более одного потребителя, берущего элементы из очереди.

Теперь проблема заключается в том, что несколько этих установок работают параллельно и последовательно. Другими словами, производители производят товары для нескольких очередей, в то время как потребители сами по себе могут также быть производителями. Это легче воспринимать как ориентированный график производителей, потребителей-производителей и, наконец, потребителей.

Причина, по которой производители должны блокировать до тех пор, пока очереди не будут пустыми (@Peter Lawrey), потому что каждый из них будет работать в потоке. Если вы оставите их просто создавать по мере того, как пространство станет доступным, вы столкнетесь с ситуацией, когда у вас слишком много потоков, которые пытаются обрабатывать слишком много вещей одновременно.

Может быть, связать это с службой исполнения может решить проблему?

Ответы

Ответ 1

Я предлагаю вам использовать BlockingQueue.drainTo(Collection, int). Вы можете использовать его с помощью take(), чтобы получить минимальное количество элементов.

Преимущество использования этого подхода заключается в том, что ваш размер партии динамически растет с рабочей нагрузкой, и продюсер не должен блокировать, когда потребитель занят. то есть он сам оптимизируется для латентности и пропускной способности.


Чтобы реализовать точно так, как было задано (что я думаю, это плохая идея), вы можете использовать SynchronousQueue с занятым потоком потребления.

то есть. потребляющая нить выполняет

 list.clear();
 while(list.size() < required) list.add(queue.take());
 // process list.

Производитель блокирует, когда потребитель будет занят.

Ответ 2

Вот быстрая (= простая, но не полностью протестированная) реализация, которая, по моему мнению, может быть подходящей для ваших запросов - вы должны иметь возможность расширять ее, чтобы поддерживать полный интерфейс очереди, если вам нужно.

чтобы увеличить производительность, вы можете переключиться на ReentrantLock вместо использования "синхронизированного" ключевого слова..

public class BatchBlockingQueue<T> {

    private ArrayList<T> queue;
    private Semaphore readerLock;
    private Semaphore writerLock;
    private int batchSize;

    public BatchBlockingQueue(int batchSize) {
        this.queue = new ArrayList<>(batchSize);
        this.readerLock = new Semaphore(0);
        this.writerLock = new Semaphore(batchSize);
        this.batchSize = batchSize;
    }

    public synchronized void put(T e) throws InterruptedException {
        writerLock.acquire();
        queue.add(e);
        if (queue.size() == batchSize) {
            readerLock.release(batchSize);
        }
    }

    public synchronized T poll() throws InterruptedException {
        readerLock.acquire();
        T ret = queue.remove(0);
        if (queue.isEmpty()) {
            writerLock.release(batchSize);
        }
        return ret;
    }

}

Надеюсь, вы сочтете это полезным.

Ответ 3

Не то, что я знаю. Если я правильно понял, вы хотите, чтобы продюсер работал (пока потребитель блокирован), пока он не заполнит очередь или потребитель для работы (пока производитель блокирует), пока он не очистит очередь. Если это случай, я могу предположить, что вам не нужна структура данных, а механизм блокировки одной стороны, в то время как другой работает в режиме mutex fasion. Вы можете заблокировать объект для этого и внутренне иметь логику полной или пустой, чтобы освободить блокировку и передать ее другой стороне. Короче говоря, вы должны написать сами:)

Ответ 4

Это звучит так, как RingBuffer работает в шаблоне LMAX Disruptor. Подробнее см. http://code.google.com/p/disruptor/.

Очень грубое объяснение - ваша основная структура данных - RingBuffer. Производители помещают данные в кольцевой буфер в последовательности, и потребители могут извлекать столько данных, сколько производитель вложил в буфер (таким образом, по сути, пакетная обработка). Если буфер заполнен, производитель блокирует до тех пор, пока потребитель не закончит и не освободит слоты в буфере.