Java-исполнитель с контролем дросселирования/пропускной способности

Я ищу Java-исполнителя, который позволяет мне указывать ограничения дросселирования/пропускной способности/стимуляции, например, не более, чем 100 запросов могут быть обработаны за секунду - если больше заданий будет отправлено, они должны попасть в очередь и выполненный позже. Основная цель этого заключается в том, чтобы избежать использования ограничений при использовании внешних API или серверов.

Мне интересно, поддерживает ли базовая Java (что я сомневаюсь, потому что я проверил) или где-то еще надежное (например, Apache Commons), или если я должен написать свой собственный. Предпочтительно что-то легкое. Я не прочь написать его сам, но если есть "стандартная" версия, где-то я, по крайней мере, хотел бы взглянуть на нее в первую очередь.

Ответы

Ответ 1

Взгляните на guavas RateLimiter:

Ограничитель скорости. Концептуально ограничитель скорости распределяет разрешения на настраиваемая скорость. Каждый приобретает() блокирует, если необходимо, до разрешения доступен, а затем берет его. После приобретения разрешений не требуется выпущенный. Ограничители ставок часто используются для ограничения скорости, с которой доступ к некоторому физическому или логическому ресурсу. Это контрастирует с Семафор, который ограничивает количество одновременных доступов вместо (обратите внимание, что concurrency и скорость тесно связаны, например см. Маленький закон).

Его потоки, но все же @Beta. В любом случае, стоит попробовать.

Вам придется привязать каждый вызов к Executor относительно ограничителя скорости. Для более чистого решения вы можете создать своего рода оболочку для ExecutorService.

Из javadoc:

 final RateLimiter rateLimiter = RateLimiter.create(2.0); // rate is "2 permits per second"
  void submitTasks(List<Runnable> tasks, Executor executor) {
    for (Runnable task : tasks) {
      rateLimiter.acquire(); // may wait
      executor.execute(task);
    }
  }

Ответ 2

Java Executor не предлагает такого ограничения, только ограничение количества потоков, которое не является тем, что вы ищете.

В общем случае Исполнитель - это неправильное место для ограничения таких действий в любом случае, оно должно быть в тот момент, когда Thread пытается вызвать внешний сервер. Вы можете сделать это, например, имея ограничение Semaphore, что потоки ожидают прежде, чем они отправят свои запросы.

Вызов темы:

public void run() {
  // ...
  requestLimiter.acquire();
  connection.send();
  // ...
 }

В то же время вы планируете (один) дополнительный поток периодически (например, каждые 60 секунд) освобождать приобретенные ресурсы:

 public void run() {
  // ...
  requestLimiter.drainPermits();  // make sure not more than max are released by draining the Semaphore empty
  requestLimiter.release(MAX_NUM_REQUESTS);
  // ...
 }

Ответ 3

не более 100 команд могут быть обработаны за секунду - если больше задачи получаются, они должны быть поставлены в очередь и выполняться позже

Вам нужно посмотреть Executors.newFixedThreadPool(int limit). Это позволит вам ограничить количество потоков, которые могут быть выполнены одновременно. Если вы отправляете несколько потоков, они будут поставлены в очередь и будут выполнены позже.

ExecutorService threadPool = Executors.newFixedThreadPool(100);
Future<?> result1 =  threadPool.submit(runnable1);
Future<?> result2 = threadPool.submit(runnable2);
Futurte<SomeClass> result3 = threadPool.submit(callable1);  
...  

Сниппета выше показывает, как вы будете работать с ExecutorService, который позволяет одновременно выполнять не более 100 потоков.

Update:
Перейдя по комментариям, вот что я придумал (вроде глупо). Как насчет ручного отслеживания потоков, которые должны быть выполнены? Как сохранить их сначала в ArrayList, а затем отправить их в Executor в зависимости от того, сколько потоков уже выполнено за последнюю секунду.
Итак, давайте скажем, что 200 задач были отправлены в наш поддерживаемый ArrayList, мы можем повторить и добавить 100 в Executor. Когда секунда проходит, мы можем добавить еще несколько потоков, исходя из того, сколько из них выполнено в Executor и т.д.

Ответ 4

В зависимости от сценария и, как было предложено в одном из предыдущих ответов, основные функции ThreadPoolExecutor могут сделать трюк.

Но если threadpool используется несколькими клиентами и вы хотите дросселировать, чтобы ограничить использование каждого из них, удостоверьтесь, что один клиент не будет использовать все потоки, тогда BoundedExecutor выполнит эту работу.

Более подробную информацию можно найти в следующем примере:

http://jcip.net/listings/BoundedExecutor.java

Ответ 5

Лично я нашел этот сценарий довольно интересным. В моем случае я хотел подчеркнуть, что интересной фазой дроссельной заслонки является потребляющая сторона, как в классической параллельной теории Producer/Consumer. Это противоположность некоторых из предложенных ответов раньше. Это означает, что мы не хотим блокировать подающий поток, но блокируем потоки потребления, основанные на политике (задача/секунда). Таким образом, даже если есть задачи, готовые в очереди, выполнение/потребление потоков может блокировать ожидание, чтобы выполнить политику переноса.

Тем не менее, я думаю, что хорошим кандидатом будет Executors.newScheduledThreadPool(int corePoolSize). Таким образом, вам понадобится простая очередь перед исполнителем (просто подходит LinkedBlockingQueue), а затем запланируйте периодическую задачу для выбора текущих задач из очереди (ScheduledExecutorService.scheduleAtFixedRate). Таким образом, это не простое решение, но оно должно выполнять достаточно гог, если вы попытаетесь разогнать потребителей, как обсуждалось ранее.

Ответ 6

Можно ограничить его внутри Runnable:

public static Runnable throttle (Runnable realRunner, long delay) {
    Runnable throttleRunner = new Runnable() {
        // whether is waiting to run
        private boolean _isWaiting = false;
        // target time to run realRunner
        private long _timeToRun;
        // specified delay time to wait
        private long _delay = delay;
        // Runnable that has the real task to run
        private Runnable _realRunner = realRunner;
        @Override
        public void run() {
            // current time
            long now;
            synchronized (this) {
                // another thread is waiting, skip
                if (_isWaiting) return;
                now = System.currentTimeMillis();
                // update time to run
                // do not update it each time since
                // you do not want to postpone it unlimited
                _timeToRun = now+_delay;
                // set waiting status
                _isWaiting = true;
            }
            try {
                Thread.sleep(_timeToRun-now);

            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                // clear waiting status before run
                _isWaiting = false;
                // do the real task
                _realRunner.run();
            }
        }};
    return throttleRunner;
}

Взять из JAVA Thread Debounce и Throttle