Java concurrency: многие авторы, один читатель
Мне нужно собрать некоторые статистические данные в моем программном обеспечении, и я пытаюсь сделать это быстро и правильно, что нелегко (для меня!)
сначала мой код до сих пор с двумя классами, StatsService и StatsHarvester
public class StatsService
{
private Map<String, Long> stats = new HashMap<String, Long>(1000);
public void notify ( String key )
{
Long value = 1l;
synchronized (stats)
{
if (stats.containsKey(key))
{
value = stats.get(key) + 1;
}
stats.put(key, value);
}
}
public Map<String, Long> getStats ( )
{
Map<String, Long> copy;
synchronized (stats)
{
copy = new HashMap<String, Long>(stats);
stats.clear();
}
return copy;
}
}
это мой второй класс, харвестер, который время от времени собирает статистику и записывает их в базу данных.
public class StatsHarvester implements Runnable
{
private StatsService statsService;
private Thread t;
public void init ( )
{
t = new Thread(this);
t.start();
}
public synchronized void run ( )
{
while (true)
{
try
{
wait(5 * 60 * 1000); // 5 minutes
collectAndSave();
}
catch (InterruptedException e)
{
e.printStackTrace();
}
}
}
private void collectAndSave ( )
{
Map<String, Long> stats = statsService.getStats();
// do something like:
// saveRecords(stats);
}
}
Во время выполнения у него будет около 30 одновременных рабочих потоков, каждый из которых вызывает notify(key)
около 100 раз. Только один статистический прибор вызывает statsService.getStats()
Итак, у меня много писателей и только один читатель. было бы неплохо иметь точную статистику, но мне все равно, если некоторые записи будут потеряны на высоком уровне concurrency.
Читатель должен работать каждые 5 минут или что-то разумное.
Письмо должно быть как можно быстрее. Чтение должно быть быстрым, но если он запирает около 300 мс каждые 5 минут, это прекрасно.
Я прочитал много документов (Java concurrency на практике, эффективная Java и т.д.), но у меня есть сильное чувство, что мне нужен ваш совет, чтобы понять это.
Надеюсь, я сказал, что моя проблема ясна и достаточно короткая, чтобы получить ценную помощь.
ИЗМЕНИТЬ
Спасибо всем за подробные и полезные ответы. Как я и ожидал, есть несколько способов сделать это.
Я проверил большинство ваших предложений (я понял) и загрузил тестовый проект в код google для дальнейшей справки (проект maven)
http://code.google.com/p/javastats/
Я тестировал различные реализации моей программы StatsService
- HashMapStatsService (HMSS)
- ConcurrentHashMapStatsService (CHMSS)
- LinkedQueueStatsService (LQSS)
- Служба GoogleStatsService (GSS)
- ExecutorConcurrentHashMapStatsService (ECHMSS)
- ExecutorHashMapStatsService (EHMSS)
и я проверил их с числом x
числа потоков, каждый из которых уведомляет y
раз, результаты находятся в ms
10,100 10,1000 10,5000 50,100 50,1000 50,5000 100,100 100,1000 100,5000
GSS 1 5 17 7 21 117 7 37 254 Summe: 466
ECHMSS 1 6 21 5 32 132 8 54 249 Summe: 508
HMSS 1 8 45 8 52 233 11 103 449 Summe: 910
EHMSS 1 5 24 7 31 113 8 67 235 Summe: 491
CHMSS 1 2 9 3 11 40 7 26 72 Summe: 171
LQSS 0 3 11 3 16 56 6 27 144 Summe: 266
В этот момент я думаю, что буду использовать ConcurrentHashMap, так как он предлагает хорошую производительность, хотя это довольно легко понять.
Спасибо за ваш вклад!
Janning
Ответы
Ответ 1
В то время как гнездо ускользало от вас, вы можете использовать библиотеку java.util.concurrent, которая включает ConcurrentHashMap и AtomicLong. Вы можете включить AtomicLong, если отсутствует else, вы можете увеличить значение. Поскольку AtomicLong является потокобезопасным, вы сможете увеличивать переменную, не беспокоясь о проблеме concurrency.
public void notify(String key) {
AtomicLong value = stats.get(key);
if (value == null) {
value = stats.putIfAbsent(key, new AtomicLong(1));
}
if (value != null) {
value.incrementAndGet();
}
}
Это должно быть как быстрым, так и потокобезопасным
Изменить: Реализован сглаженно, поэтому существует только не более двух запросов.
Ответ 2
Почему бы вам не использовать java.util.concurrent.ConcurrentHashMap<K, V>
? Он обрабатывает все, что внутренне избегает бесполезных замков на карте и экономит вам много работы: вам не придется заботиться о синхронизации при получении и установке.
Из документации:
Хэш-таблица, поддерживающая полный concurrency выборки и настраиваемый ожидаемый concurrency для обновлений. Этот класс подчиняется той же функциональной спецификации, что и Hashtable, и включает версии методов, соответствующих каждому методу Hashtable. Однако, несмотря на то, что все операции являются потокобезопасными, операции поиска не влекут за собой блокировку, и нет никакой поддержки для блокировки всей таблицы таким образом, чтобы предотвратить весь доступ.
Вы можете указать уровень concurrency:
Разрешенный concurrency среди операций обновления управляется необязательным аргументом конструктора concurrencyLevel (по умолчанию 16), который используется как подсказка для внутреннего размера. Таблица внутренне разделена, чтобы попытаться разрешить указанное количество одновременных обновлений без конкуренции. Поскольку размещение в хэш-таблицах по существу является случайным, фактический concurrency будет отличаться. В идеале вы должны выбрать значение для размещения как можно большего количества потоков, которые будут одновременно изменять таблицу. Использование значительно более высокой ценности, чем вам нужно, может тратить пространство и время, а значительно меньшее значение может привести к конфликту с потоком. Но переоценки и недооценки в порядке величины обычно не оказывают заметного влияния. Значение одного является подходящим, когда известно, что только один поток будет изменен, и все остальные будут только читать. Кроме того, изменение размера этой или любой другой хэш-таблицы является относительно медленной операцией, поэтому, когда это возможно, рекомендуется представить оценки ожидаемых размеров таблиц в конструкторах.
Как указано в комментариях, внимательно прочитайте документацию ConcurrentHashMap, особенно когда она заявляет об атомных или неатомных операциях.
Чтобы иметь гарантию атомарности, вы должны рассмотреть, какие операции являются атомарными, из интерфейса ConcurrentMap
вы узнаете, что:
V putIfAbsent(K key, V value)
V replace(K key, V value)
boolean replace(K key,V oldValue, V newValue)
boolean remove(Object key, Object value)
можно безопасно использовать.
Ответ 3
Я бы предложил взглянуть на библиотеку Java util.concurrent. Я думаю, вы можете реализовать это решение намного чище. Я не думаю, что вам нужна карта здесь вообще. Я бы рекомендовал реализовать это, используя ConcurrentLinkedQueue. Каждый "продюсер" может свободно писать в эту очередь, не беспокоясь о других. Он может помещать объект в очередь с данными для его статистики.
Харвестер может потреблять очередь, постоянно вытаскивая данные и обрабатывая их. Затем он может сохранить его, но он нуждается.
Ответ 4
Ответ Криса Дайла выглядит как хороший подход.
Другой альтернативой может быть использование параллельного Multiset
. Существует одна в Библиотека коллекций Google. Вы можете использовать это следующим образом:
private Multiset<String> stats = ConcurrentHashMultiset.create();
public void notify ( String key )
{
stats.add(key, 1);
}
Глядя на source, это реализовано с помощью ConcurrentHashMap
и с использованием putIfAbsent
и трехпараметрической версии replace
для обнаружения одновременных изменений и повторных попыток.
Ответ 5
Другой подход к проблеме заключается в том, чтобы использовать (тривиальную) безопасность потоков через ограничение потока. В основном создайте один фоновый поток, который заботится о чтении и записи. Он имеет довольно хорошие характеристики с точки зрения масштабируемости и простоты.
Идея состоит в том, что вместо всех потоков, пытающихся напрямую обновить данные, они создают задачу "обновления" для обработки фонового потока. Тот же поток также может выполнять задачу чтения, если предположить, что некоторые задержки в обработке обновлений являются допустимыми.
Этот дизайн довольно хорош, потому что потокам больше не придется конкурировать за блокировку для обновления данных, а так как карта ограничена одним потоком, вы можете просто использовать простой HashMap для получения/размещения и т.д. В условия реализации, это будет означать создание единого поточного исполнителя и отправку задач записи, которые также могут выполнять необязательную операцию "collectAndSave".
Эскиз кода может выглядеть следующим образом:
public class StatsService {
private ExecutorService executor = Executors.newSingleThreadExecutor();
private final Map<String,Long> stats = new HashMap<String,Long>();
public void notify(final String key) {
Runnable r = new Runnable() {
public void run() {
Long value = stats.get(key);
if (value == null) {
value = 1L;
} else {
value++;
}
stats.put(key, value);
// do the optional collectAndSave periodically
if (timeToDoCollectAndSave()) {
collectAndSave();
}
}
};
executor.execute(r);
}
}
Существует BlockingQueue, связанный с исполнителем, и каждый поток, который создает задачу для StatsService, использует BlockingQueue. Ключевым моментом является следующее: продолжительность блокировки для этой операции должна быть значительно короче, чем продолжительность блокировки в исходном коде, поэтому утверждение должно быть намного меньше. В целом это должно привести к значительно лучшей пропускной способности и задержке.
Другим преимуществом является то, что, поскольку только один поток считывает и записывает на карту, можно использовать простой HashMap и примитивный длинный тип (нет ConcurrentHashMap или атомных типов). Это также упрощает код, который фактически обрабатывает его очень много.
Надеюсь, что это поможет.
Ответ 6
Вы просмотрели ScheduledThreadPoolExecutor
? Вы можете использовать это, чтобы запланировать своих авторов, которые могли бы все записывать в параллельную коллекцию, например, ConcurrentLinkedQueue
, упомянутую @Chris Dail. У вас может быть задание отдельного расписания для чтения из очереди по мере необходимости, и Java SDK должен обрабатывать практически все ваши проблемы с concurrency, не требуется ручная блокировка.
Ответ 7
Если мы игнорируем часть сбора урожая и фокусируемся на написании, основным узким местом программы является то, что статистика блокируется с очень грубым уровнем детализации. Если два потока хотят обновить разные ключи, они должны ждать.
Если вы заранее знаете набор ключей и можете предварительно инициализировать карту, чтобы к моменту появления потока обновлений ключ гарантированно существовал, вы могли бы сделать блокировку на переменной аккумулятора вместо всей карты, или использовать поточный аккумуляторный объект.
Вместо того, чтобы реализовать это самостоятельно, существуют реализации карт, специально разработанные для concurrency, и сделайте это более мелкозернистую блокировку для вас.
Одно из предостережений - это статистика, так как вам нужно будет получить блокировки на всех аккумуляторах примерно в одно и то же время. Если вы используете существующую concurrency -дружественную карту, может быть конструкция для получения моментального снимка.
Ответ 8
Другая альтернатива для реализации обоих методов с помощью ReentranReadWriteLock. Эта реализация защищает от условий гонки в методе getStats, если вам нужно очистить счетчики. Также он удаляет изменяемый AtomicLong из getStats и использует неизменяемый Long.
public class StatsService {
private final Map<String, AtomicLong> stats = new HashMap<String, AtomicLong>(1000);
private final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
private final Lock r = rwl.readLock();
private final Lock w = rwl.writeLock();
public void notify(final String key) {
r.lock();
AtomicLong count = stats.get(key);
if (count == null) {
r.unlock();
w.lock();
count = stats.get(key);
if(count == null) {
count = new AtomicLong();
stats.put(key, count);
}
r.lock();
w.unlock();
}
count.incrementAndGet();
r.unlock();
}
public Map<String, Long> getStats() {
w.lock();
Map<String, Long> copy = new HashMap<String, Long>();
for(Entry<String,AtomicLong> entry : stats.entrySet() ){
copy.put(entry.getKey(), entry.getValue().longValue());
}
stats.clear();
w.unlock();
return copy;
}
}
Я надеюсь, что это поможет, любые комментарии приветствуются!
Ответ 9
Вот как это сделать с минимальным воздействием на производительность измеряемых потоков. Это самое быстрое решение на Java, не прибегая к специальным аппаратным регистрам для подсчета производительности.
Каждый поток выводит свою статистику независимо от других, то есть без синхронизации, с некоторым объектом статистики. Сделайте поле, содержащее подсчет, изменчивым, поэтому он огорожен памятью:
class Stats
{
public volatile long count;
}
class SomeRunnable implements Runnable
{
public void run()
{
doStuff();
stats.count++;
}
}
У вас есть другой поток, который содержит ссылку на все объекты Stats, периодически обходите их все и добавляйте подсчеты по всем потокам:
public long accumulateStats()
{
long count = previousCount;
for (Stats stat : allStats)
{
count += stat.count;
}
long resultDelta = count - previousCount;
previousCount = count;
return resultDelta;
}
Этот поток сборщика также нуждается в добавлении к нему спящего() (или некоторого другого дросселя). Он может периодически выводить counts/sec на консоль, например, чтобы дать вам "живое" представление о том, как работает ваше приложение.
Это позволяет избежать накладных расходов синхронизации столько, сколько вы можете.
Другим трюком для рассмотрения является заполнение объектов Stats до 128 (или 256 байтов на SandyBridge или более поздней версии), чтобы поддерживать разные потоки в разных строках кэша, или будет иметь место кеширование конкуренции на процессоре.
Когда только один поток читает и один пишет, вам не нужны блокировки или атомы, достаточно летучих. Там все равно будет некоторая проблема с потоком, когда поток чтения статистики взаимодействует с линией кэша ЦП измеряемого потока. Этого нельзя избежать, но это способ сделать это с минимальным воздействием на текущую нить; прочитайте статистику, возможно, раз в секунду или меньше.