Ограниченный приоритетBlockingQueue

PriorityBlockingQueue неограничен, но мне нужно каким-то образом связать его. Каков наилучший способ достичь этого?

Для информации ограниченный PriorityBlockingQueue будет использоваться в ThreadPoolExecutor.

NB: ограниченным я не хочу бросать Exception, если это произойдет, я хочу поместить объект в очередь, а затем вырезать его на основе его значения приоритета. Есть ли хороший способ сделать это?

Ответы

Ответ 1

Я бы не подклассифицировал его. Пока я не могу составить пример кода прямо сейчас, я бы предложил версию шаблона декоратора.

Создайте новый класс и реализуйте интерфейсы, реализованные в интересующем вас классе: PriorityBlockingQueue. Я нашел следующие интерфейсы, используемые этим классом:

Serializable, Iterable<E>, Collection<E>, BlockingQueue<E>, Queue<E>

В конструкторе для класса принимаем PriorityBlockingQueue как параметр конструктора.

Затем реализуйте все методы, требуемые интерфейсами, через экземпляры PriorityBlockingQueue. Добавьте любой код, необходимый для его ограничения.

Вот краткая диаграмма, которую я собрал в Violet UML:

Диаграмма классов BoundedPriorityblockingQueue http://theopensourceu.com/wp-content/uploads/2010/03/BoundedPriorityBlockingQueue.png

Ответ 2

Реализация этого в библиотеке Google Collections/Guava: MinMaxPriorityQueue.

Очередь приоритетов min-max может быть настроена с максимальным размером. Если так, каждый раз, когда размер очереди превышает это значение, очередь автоматически удаляет свой самый большой элемент в соответствии со своим компаратором (который может быть только что добавленным элементом). Это другое из обычных ограниченных очередей, которые либо блокируют, либо отклоняют новые элементов при заполнении.

Ответ 3

В верхней части моей головы я подклассифицировал его и перезаписал метод put, чтобы обеспечить его соблюдение. Если он перебрасывает исключение или делает все, что кажется подходящим.

Что-то вроде:

public class LimitedPBQ extends PriorityBlockingQueue {

    private int maxItems;
    public LimitedPBQ(int maxItems){
        this.maxItems = maxItems;
    }

    @Override
    public boolean offer(Object e) {
        boolean success = super.offer(e);
        if(!success){
            return false;
        } else if (this.size()>maxItems){
            // Need to drop last item in queue
            // The array is not guaranteed to be in order, 
            // so you should sort it to be sure, even though Sun Java 6 
            // version will return it in order
            this.remove(this.toArray()[this.size()-1]);
        }
        return true;
    }
}

Изменить: оба добавить и поставить предложение invoke, поэтому переопределить его должно быть достаточно

Правка 2: теперь нужно удалить последний элемент, если над maxItems. Однако может быть более элегантный способ сделать это.

Ответ 4

После внедрения BoundedPriorityBlockingQueue в соответствии с тем, что предложил Фрэнк В., я понял, что не сделал то, что хотел. Основная проблема заключается в том, что элемент, который я должен вставить в очередь, может быть более высоким приоритетом, чем все, что уже находится в очереди. Таким образом, я действительно хочу, это метод "pivot", если я помещаю объект в очередь, когда очередь заполнена, я хочу вернуть объект с наименьшим приоритетом, а не блокировать.

Чтобы отразить предложения Frank V, я использовал следующие фрагменты...

public class BoundedPriorityBlockingQueue<E> 
   implements 
     Serializable, 
     Iterable<E>, 
     Collection<E>, 
     BlockingQueue<E>, 
     Queue<E>, 
     InstrumentedQueue 
{

...   закрытый закрытый замок ReentrantLock;//= новый ReentrantLock();   частный конечный Состояние notFull;

final private int capacity;
final private PriorityBlockingQueue<E> queue;

public BoundedPriorityBlockingQueue(int capacity) 
  throws IllegalArgumentException, 
         NoSuchFieldException, 
         IllegalAccessException 
{
   if (capacity < 1) throw 
       new IllegalArgumentException("capacity must be greater than zero");      
   this.capacity = capacity;
   this.queue = new PriorityBlockingQueue<E>();

   // gaining access to private field
   Field reqField;
   try {
    reqField = PriorityBlockingQueue.class.getDeclaredField("lock");
    reqField.setAccessible(true);
    this.lock = (ReentrantLock)reqField.get(ReentrantLock.class);
    this.notFull = this.lock.newCondition();

   } catch (SecurityException ex) {
    ex.printStackTrace();
    throw ex;
   } catch (NoSuchFieldException ex) {
    ex.printStackTrace();
    throw ex;
   } catch (IllegalAccessException ex) {
    ex.printStackTrace();
    throw ex;
   }

...

@Override
public boolean offer(E e) {
    this.lock.lock();
    try {
        while (this.size() == this.capacity)
            notFull.await();
        boolean success = this.queue.offer(e);
        return success;
    } catch (InterruptedException ie) {
        notFull.signal(); // propagate to a non-interrupted thread
        return false;

    } finally {
        this.lock.unlock();
    }
}
...

У этого также есть некоторые инструменты, поэтому я могу проверить эффективность очереди. Я все еще работаю над "PivotPriorityBlockingQueue", если кто-то заинтересован, я могу опубликовать его.

Ответ 5

Если порядок Runnables, который вы хотите выполнить, не является строгим (как есть: может случиться, что выполняются некоторые задачи с более низким приоритетом, даже если существуют задачи с более высоким приоритетом), тогда я бы предложил следующее, которое сводится к периодическому уменьшая размер PriorityQueue:

if (queue.size() > MIN_RETAIN * 2){
    ArrayList<T> toRetain = new ArrayList<T>(MIN_RETAIN);
    queue.drainTo(toRetain, MIN_RETAIN);
    queue.clear();
    for (T t : toRetain){
      queue.offer(t);
    }
}

Это, очевидно, не удастся, если порядок должен быть строгим, поскольку дренаж приведет к моменту, из-за которого будет выполняться низкоприоритетная задача, из параллельного доступа будет получена.

Преимущества заключаются в том, что это поточно-безопасный и, вероятно, будет работать так быстро, как вы можете сделать с дизайном очереди приоритетов.

Ответ 6

До сих пор ни один ответ не имеет следующих свойств:

  • Реализует интерфейс BlockingQueue.
  • Поддерживает удаление абсолютного наибольшего значения.
  • Нет условий гонки.

К сожалению, в стандартной библиотеке Java нет реализации BlockingQueue. Вам нужно либо найти реализацию, либо реализовать что-то самостоятельно. Для реализации BlockingQueue потребуются некоторые знания о правильной блокировке.

Вот что я предлагаю: посмотрите https://gist.github.com/JensRantil/30f812dd237039257a3d и используйте его в качестве шаблона для реализации собственной оболочки вокруг SortedSet, В принципе, все блокировки есть и есть несколько модульных тестов (которые нуждаются в некоторой настройке).

Ответ 8

Посмотрите ForwardingQueue из API коллекций Google. Для блокировки семантики вы можете использовать Semaphore.

Ответ 9

Существует другая реализация здесь

Кажется, он делает то, о чем вы просите:

A BoundedPriorityQueue реализует очередь приоритетов с верхней границей числа элементов. Если очередь не заполнена, добавленные элементы всегда добавляются. Если очередь заполнена, а добавленный элемент больше самого маленького элемента в очереди, то самый маленький элемент удаляется и добавляется новый элемент. Если очередь заполнена, а добавленный элемент не больше самого маленького элемента в очереди, новый элемент не добавляется.