Ответ 1
Быстрый ответ: да, они потокобезопасны. Но не оставляйте его там...
Во-первых, небольшой домик, BlockingQueue
- это интерфейс, и любая реализация, которая не является потокобезопасной, будет нарушать документированный контракт. Ссылка, которую вы включили, ссылалась на LinkedBlockingQueue
, которая имеет некоторую уловку.
Ссылка которую вы включили, делает интересное наблюдение, да есть две блокировки внутри LinkedBlockingQueue
. Тем не менее, он не понимает, что краевой случай, когда "простая" реализация напала на фол, на самом деле обрабатывается, поэтому методы take and put более сложны, чем можно было бы ожидать вначале.
LinkedBlockingQueue
оптимизирован, чтобы избежать использования одной и той же блокировки как при чтении, так и в записи, это уменьшает конкуренцию, однако для правильного поведения он полагается на то, что очередь не пуста. Когда в очереди есть элементы внутри, то нажатие и поп-точки не находятся в одной области памяти, и соперничество можно избежать. Однако, когда очередь пуста, конкуренция не может быть устранена, поэтому необходим дополнительный код для обработки этого общего "края". Это общий компромисс между сложностью кода и производительностью/масштабируемостью.
Затем возникает вопрос, как LinkedBlockingQueue
знает, когда очередь пуста/не пуста и, таким образом, обрабатывает потоки? Ответ заключается в том, что он использует AtomicInteger
и Condition
как две дополнительные параллельные структуры данных. Элемент AtomicInteger
используется для проверки того, равна ли длина очереди нулевым, и Условие используется для ожидания сигнала для уведомления ожидающего потока, когда очередь, вероятно, находится в желаемом состоянии. Эта дополнительная координация имеет накладные расходы, однако в измерениях было показано, что при увеличении количества одновременных потоков, что накладные расходы этого метода ниже, чем конкуренция, которая вводится с помощью одной блокировки.
Ниже я скопировал код из LinkedBlockingQueue
и добавил комментарии, объясняющие, как они работают. На высоком уровне take()
сначала блокирует все остальные вызовы take()
, а затем сигналы put()
по мере необходимости. put()
работает аналогичным образом, сначала он блокирует все остальные вызовы put()
, а затем сигналы take()
при необходимости.
Из метода put()
:
// putLock coordinates the calls to put() only; further coordination
// between put() and take() follows below
putLock.lockInterruptibly();
try {
// block while the queue is full; count is shared between put() and take()
// and is safely visible between cores but prone to change between calls
// a while loop is used because state can change between signals, which is
// why signals get rechecked and resent.. read on to see more of that
while (count.get() == capacity) {
notFull.await();
}
// we know that the queue is not full so add
enqueue(e);
c = count.getAndIncrement();
// if the queue is not full, send a signal to wake up
// any thread that is possibly waiting for the queue to be a little
// emptier -- note that this is logically part of 'take()' but it
// has to be here because take() blocks itself
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
От take()
takeLock.lockInterruptibly();
try {
// wait for the queue to stop being empty
while (count.get() == 0) {
notEmpty.await();
}
// remove element
x = dequeue();
// decrement shared count
c = count.getAndDecrement();
// send signal that the queue is not empty
// note that this is logically part of put(), but
// for thread coordination reasons is here
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();