Компьютерная карта: вычисление стоимости загодя
У меня есть расчетная карта (с мягкие значения), который я использую для кэширования результатов дорогостоящего вычисления.
Теперь у меня есть ситуация, когда я знаю, что конкретный ключ, скорее всего, будет рассмотрен в течение следующих нескольких секунд. Этот ключ также более дорог для вычисления, чем большинство.
Я хотел бы заранее вычислить значение в потоке с минимальным приоритетом, чтобы при запросе значения он уже был кэширован, улучшив время отклика.
Что такое хороший способ сделать это, чтобы:
- У меня есть контроль над потоком (в частности, его приоритетом), в котором выполняется вычисление.
- Дублировать работу можно избежать, т.е. вычисление выполняется только один раз. Если задача вычисления уже запущена, то вызывающий поток ожидает эту задачу вместо вычисления значения снова (
FutureTask
реализует это. С помощью вычислений Guava это верно, если вы вызываете только get
, но нет, если вы смешиваете его с вызовами до put
.)
- Метод "вычислять значение заранее" является асинхронным и идемпотентным. Если вычисление уже выполняется, оно должно немедленно вернуться, не дожидаясь завершения этого вычисления.
- Избегать инверсии приоритета, например. если поток с высоким приоритетом запрашивает значение, а поток среднего приоритета выполняет что-то несвязанное, но задача вычисления ставится в очередь на поток с низким приоритетом, поток с высоким приоритетом не должен быть голоден. Возможно, это может быть достигнуто путем временного повышения приоритета вычислительного потока (ов) и/или выполнения вычисления в вызывающем потоке.
Как это можно скоординировать между всеми задействованными нитями?
Дополнительная информация
Вычисления в моем приложении - операции фильтрации изображений, что означает, что все они связаны с процессором. Эти операции включают в себя аффинные преобразования (от 50 мкс до 1 мс) и свертки (до 10 мс). Конечно, эффективность различных приоритетов потоков зависит от способности ОС упреждать более крупные задачи.
Ответы
Ответ 1
Вы можете организовать только однократное выполнение фонового вычисления с помощью Future с помощью ComputedMap. Будущее представляет задачу, вычисляющую значение. Будущее создается ComputedMap и в то же время передается ExecutorService для выполнения фона. Исполнитель может быть настроен с помощью собственной ThreadFactory, которая создает потоки с низким приоритетом, например.
class LowPriorityThreadFactory implements ThreadFactory
{
public Thread newThread(Runnable r) {
Tread t = new Thread(r);
t.setPriority(MIN_PRIORITY);
return t;
}
}
Когда это значение необходимо, ваш высокоприоритетный поток затем извлекает будущее из карты и вызывает метод get() для получения результата, ожидая его вычисления при необходимости. Чтобы избежать инверсия приоритета, вы добавляете в команду дополнительный код:
class HandlePriorityInversionTask extends FutureTask<ResultType>
{
Integer priority; // non null if set
Integer originalPriority;
Thread thread;
public ResultType get() {
if (!isDone())
setPriority(Thread.currentThread().getPriority());
return super.get();
}
public void run() {
synchronized (this) {
thread = Thread.currentThread();
originalPriority = thread.getPriority();
if (priority!=null) setPriority(priority);
}
super.run();
}
protected synchronized void done() {
if (originalPriority!=null) setPriority(originalPriority);
thread = null;
}
void synchronized setPriority(int priority) {
this.priority = Integer.valueOf(priority);
if (thread!=null)
thread.setPriority(priority);
}
}
Это поможет повысить приоритет задачи до приоритета потока, вызывающего get()
, если задача не завершена, и возвращает приоритет оригиналу, когда задача завершается, как обычно, или иначе. (Чтобы сохранить его кратким, код не проверяет, действительно ли приоритет больше, но его легко добавить.)
При вызове задачи с приоритетным вызовом get() будущее может еще не начаться. У вас может возникнуть соблазн избежать этого, установив большую верхнюю границу количества потоков, используемых службой-исполнителем, но это может быть плохой идеей, поскольку каждый поток может работать с высоким приоритетом, потребляя столько же процессора, сколько может раньше ОС отключает его. Вероятно, пул должен быть того же размера, что и количество аппаратных потоков, например. размер пула до Runtime.availableProcessors()
. Если задача не запущена, а не ждать, пока исполнитель заплатит ее (что является формой инверсии приоритета, так как ваш поток с высоким приоритетом ожидает завершения потоков с низким приоритетом), вы можете отказаться от него текущий исполнитель и повторно отправить на исполнитель, работающий только с высокоприоритетными потоками.
Ответ 2
Одним из распространенных способов координации такого типа ситуации является наличие карты, значения которой являются объектами FutureTask. Итак, украв в качестве примера некоторый код, который я написал с моего веб-сервера, основная идея заключается в том, что для данного параметра мы видим, есть ли уже FutureTask (что означает, что расчет с этим параметром уже запланирован) и если так мы ждем его. В этом примере мы иначе планируем поиск, но это может быть сделано в другом месте с помощью отдельного вызова, если это было бы желательно:
private final ConcurrentMap<WordLookupJob, Future<CharSequence>> cache = ...
private Future<CharSequence> getOrScheduleLookup(final WordLookupJob word) {
Future<CharSequence> f = cache.get(word);
if (f == null) {
Callable<CharSequence> ex = new Callable<CharSequence>() {
public CharSequence call() throws Exception {
return doCalculation(word);
}
};
Future<CharSequence> ft = executor.submit(ex);
f = cache.putIfAbsent(word, ft);
if (f != null) {
// somebody slipped in with the same word -- cancel the
// lookup we've just started and return the previous one
ft.cancel(true);
} else {
f = ft;
}
}
return f;
}
С точки зрения приоритетов потоков: интересно, достигнет ли это того, что вы думаете? Я не совсем понимаю вашу мысль о повышении приоритета поиска над ожидающим потоком: если поток ожидает, то он ждет, независимо от относительных приоритетов других потоков... (Возможно, вам стоит взглянуть на некоторые статьи, которые я написал на приоритеты потоков и планирование потоков, но, чтобы сократить длинную историю, я не уверен, что изменение приоритета обязательно купит вам то, что вы ожидаете.)
Ответ 3
Я подозреваю, что вы движетесь по неправильному пути, сосредоточившись на приоритетах потоков. Обычно данные, хранящиеся в кеше, являются дорогостоящими для вычисления из-за ввода-вывода (данные вне памяти) и ограничения по процессору (логическое вычисление). Если вы предпочтете угадать будущее пользователя, например, глядя на непрочитанные электронные письма, то это указывает на то, что ваша работа, скорее всего, связана с I/O. Это означает, что до тех пор, пока головоломка потока не возникает (какие планировщики запрещают), игра в игры с приоритетом потока не будет предлагать большую часть улучшения производительности.
Если стоимость представляет собой вызов ввода-вывода, фоновый поток блокируется в ожидании поступления данных и обработки этих данных достаточно дешево (например, десериализация). Поскольку изменение приоритета потока не будет предлагать большую часть ускорения, выполнение работы асинхронно на фоновом потоке должно быть достаточным. Если ограничение промаха в кеше слишком велико, то использование нескольких уровней кэширования имеет тенденцию способствовать дальнейшему снижению воспринимаемой пользователем задержки.
Ответ 4
В качестве альтернативы приоритетам потоков вы можете выполнять задачу с низким приоритетом, только если не выполняются высокоприоритетные задачи. Вот простой способ сделать это:
AtomicInteger highPriorityCount = new AtomicInteger();
void highPriorityTask() {
highPriorityCount.incrementAndGet();
try {
highPriorityImpl();
} finally {
highPriorityCount.decrementAndGet();
}
}
void lowPriorityTask() {
if (highPriorityCount.get() == 0) {
lowPriorityImpl();
}
}
В вашем случае использования оба метода Impl() будут вызывать get() на вычислительной карте, highPriorityImpl() в том же потоке и lowPriorityImpl() в другом потоке.
Вы можете написать более сложную версию, которая отсылает низкоприоритетные задачи до тех пор, пока не будут выполнены высокоприоритетные задачи и не будет ограничено количество одновременных низкоприоритетных задач.