LinkedBlockingQueue против ConcurrentLinkedQueue
Мой вопрос связан с этим вопросом, который был задан ранее. В ситуациях, когда я использую очередь для связи между потоками производителя и потребителя, люди обычно рекомендуют использовать LinkedBlockingQueue
или ConcurrentLinkedQueue
?
Каковы преимущества/недостатки использования одного над другим?
Основное отличие, которое я вижу с точки зрения API, состоит в том, что LinkedBlockingQueue
может быть дополнительно ограничено.
Ответы
Ответ 1
Для потока производителей/потребителей я не уверен, что ConcurrentLinkedQueue
является даже разумным вариантом - он не реализует BlockingQueue
, который является основным интерфейсом для очереди производителей/потребителей IMO. Вам нужно будет позвонить poll()
, немного подождать, если вы ничего не нашли, а затем снова опробуйте и т.д., Что приведет к задержкам при поступлении нового элемента и неэффективности, когда он пуст (из-за ненужного пробуждения от сна).
Из документов для BlockingQueue:
Реализации BlockingQueue
предназначены для использования в основном для очередей производителей-потребителей
Я знаю, что он строго не говорит, что для очередей производителей-потребителей следует использовать только блокирующие очереди, но даже так...
Ответ 2
Этот вопрос заслуживает лучшего ответа.
Java ConcurrentLinkedQueue
основан на известном алгоритме Магеда М. Майкла и Майкла Л. Скотта для не- блокировка блокировки.
"Non-blocking" как термин здесь для конкурирующего ресурса (наша очередь) означает, что независимо от того, что делает планировщик платформы, например, прерывание потока, или если поток, о котором идет речь, слишком медленный, другие потоки, претендующие на тот же ресурс все равно сможет продвигаться вперед. Например, если блокировка задействована, поток, удерживающий блокировку, может быть прерван, и все потоки, ожидающие блокировки, будут заблокированы. Внутренние блокировки (ключевое слово synchronized
) в Java также могут иметь суровое наказание за производительность - например, при предвзятой блокировке, и у вас есть спор, или после VM решает "раздуть" блокировку после периода отсрочки отжима и блокировать конфликтующие потоки... вот почему во многих контекстах (сценарии низкого/среднего соперничества) выполнение сравнений и наборов по атомным ссылкам может быть намного более эффективным и это именно то, что делают многие неблокирующие структуры данных.
Java ConcurrentLinkedQueue
не только не блокирует, но обладает удивительным свойством, которое производитель не поддерживает с потребителем. В одном сценарии производителя/единого потребителя (SPSC) это действительно означает, что говорить об этом не будет. В сценарии множественного производителя/единого потребителя потребитель не будет бороться с производителями. У этой очереди есть соперничество, когда несколько продюсеров пытаются offer()
, но это concurrency по определению. Это в основном общая цель и эффективная неблокирующая очередь.
Поскольку это не является BlockingQueue
, ну, блокирование потока, ожидающего очереди в очереди, - это ужасно ужасный способ проектирования параллельных систем. Не. Если вы не можете понять, как использовать ConcurrentLinkedQueue
в сценарии "потребитель/производитель", просто переключитесь на абстракции более высокого уровня, например, на хорошую актерскую структуру.
Ответ 3
LinkedBlockingQueue
блокирует пользователя или производителя, когда очередь пуста или заполнена, а соответствующий поток потребителя/производителя ставится спать. Но эта функция блокировки поставляется с затратами: каждая операция "положить или принять" заблокирована между производителями или потребителями (если их много), поэтому в сценариях со многими производителями/потребителями операция может быть медленнее.
ConcurrentLinkedQueue
не использует блокировки, но CAS, на своих операциях put/take, потенциально уменьшающих конкуренцию со многими потоками производителей и потребителей. Но, будучи структурой данных "без ожидания", ConcurrentLinkedQueue
не будет блокироваться при пустом значении, что означает, что потребителю нужно будет иметь дело с возвращаемыми значениями take()
null
словами "ожидание", например, с потребительской нитью поедание процессора.
Итак, какой из них "лучше" зависит от количества потребительских потоков, от скорости их потребления/производства и т.д. Для каждого сценария необходим эталон.
В одном конкретном случае, когда ConcurrentLinkedQueue
явно лучше, когда производители сначала производят что-то и заканчивают свою работу, помещая работу в очередь и только после, потребители начинают потреблять, зная, что они будут выполняться, когда очередь пуста. (здесь нет concurrency между производителем-потребителем, но только между производителем-производителем и потребителем-потребителем)
Ответ 4
Если ваша очередь не расширяется и содержит только один поток производителя/потребителя. Вы можете использовать блокированную очередь (вам не нужно блокировать доступ к данным).
Ответ 5
FYI, попробуйте это --- Вид хакерского, не очень научного, но, возможно, интересного:
import java.util.ArrayList;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.function.Supplier;
class QueueDemo {
private static final int NUM_NODES = 1000000;
private static final int NUM_TRIALS = 10;
private static final int NUM_THREADS = 8;
private static final Object ANY_OBJECT = new Object();
public static void main(String[] args) {
Queue<Object> qN = new ConcurrentLinkedQueue<>();
Queue<Object> qB = new LinkedBlockingQueue<>();
Queue<Object> qA = new ArrayBlockingQueue<>(NUM_NODES+1);
for (int i=0 ; i<NUM_TRIALS ; i++) {
doOneTrial(qN, "non-blocking");
doOneTrial(qB, " blocking");
doOneTrial(qA, " Array");
}
}
private static void doOneTrial(final Queue<Object> q, String name) {
List<CompletableFuture<Integer>> futures = new ArrayList<>(NUM_THREADS);
CountDownLatch startSignal = new CountDownLatch(1);
CountDownLatch doneSignal = new CountDownLatch(NUM_THREADS);
fillQueue(q);
for (int i=0 ; i<NUM_THREADS ; i++) {
futures.add(CompletableFuture.supplyAsync(new Supplier<Integer>() {
public Integer get() {
int count = 0;
wait4(startSignal);
while (q.poll() != null) count++;
doneSignal.countDown();
return count;
}
}));
}
long startTime = System.currentTimeMillis();
startSignal.countDown();
wait4(doneSignal);
long endTime = System.currentTimeMillis();
int count = 0;
for (CompletableFuture<Integer> future : futures) {
count += future.join();
}
if (count == NUM_NODES) {
System.out.println(name + ", " + Long.toString(endTime-startTime));
} else {
System.out.println("Aieeeeeegh!");
System.exit(1);
}
}
private static void fillQueue(Queue<Object> q) {
for (int i=0 ; i<NUM_NODES ; i++) {
q.add(ANY_OBJECT);
}
}
private static void wait4(CountDownLatch latch) {
try {
latch.await();
} catch (InterruptedException ex) {
throw new RuntimeException(ex);
}
}
}
Ответ 6
Другим решением (которое недостаточно масштабируется) является канал рандеву: java.util.concurrent SynchronousQueue