Внедрение высокоэффективного мьютекса, аналогичного Qt one
У меня многопоточное научное приложение, в котором несколько вычислительных потоков (по одному на ядро) должны хранить свои результаты в общем буфере. Для этого требуется механизм мьютекса.
Рабочие потоки тратят лишь небольшую часть времени на запись в буфер, поэтому мьютексы разблокируются большую часть времени, и блокировки имеют высокую вероятность успеха сразу, не дожидаясь разблокировки другого потока.
В настоящее время я использовал Qt QMutex для задачи, и он работает хорошо: мьютекс имеет незначительные накладные расходы.
Однако мне нужно перенести его только на С++ 11/STL. При использовании std:: mutex производительность снижается на 66%, а потоки большую часть времени тратят на блокировку мьютекса.
После другого вопроса я понял, что Qt использует быстрый механизм блокировки на основе простого атомного флага, оптимизированного для случаев, когда мьютекс еще не заблокирован. И возвращается к системному мьютексу, когда происходит параллельная блокировка.
Я хотел бы реализовать это в STL. Есть простой способ, основанный на std:: atomic и std:: mutex? Я копал в Qt-коде, но мне кажется, что это слишком сложно для моего использования (мне не нужны тайм-ауты блокировок, pimpl, малая занимаемая площадь и т.д.).
Изменить: я пробовал спин-блокировку, но это плохо работает, потому что:
Периодически (каждые несколько секунд) другой поток блокирует мьютексы и сбрасывает буфер. Это занимает некоторое время, поэтому все рабочие потоки блокируются в это время. Винтовые блоки делают планирование занятым, что приводит к тому, что флеш будет на 10-100x медленнее, чем при соответствующем мьютексе. Это неприемлемо
Изменить: я пробовал это, но он не работает (блокирует все потоки)
class Mutex
{
public:
Mutex() : lockCounter(0) { }
void lock()
{
if(lockCounter.fetch_add(1, std::memory_order_acquire)>0)
{
std::unique_lock<std::mutex> lock(internalMutex);
cv.wait(lock);
}
}
void unlock();
{
if(lockCounter.fetch_sub(1, std::memory_order_release)>1)
{
cv.notify_one();
}
}
private:
std::atomic<int> lockCounter;
std::mutex internalMutex;
std::condition_variable cv;
};
Спасибо!
Изменить: окончательное решение
Быстрое мьютекс MikeMB работал очень хорошо.
В качестве окончательного решения я сделал:
- Используйте простую прямую блокировку с помощью try_lock
- Когда поток не пытается try_lock, вместо ожидания он заполняет очередь (которая не используется совместно с другими потоками) и продолжает
- Когда поток получает блокировку, он обновляет буфер с текущим результатом, но также и с результатами, хранящимися в очереди (обрабатывает свою очередь)
- Буферная промывка была сделана гораздо более эффективно: блокирующая часть заменяет только два указателя.
Ответы
Ответ 1
Общие советы
Как уже упоминалось в некоторых комментариях, я бы сначала посмотрел, можете ли вы реструктурировать свой дизайн программы, чтобы сделать реализацию мьютекса менее критичной для вашей производительности.
Кроме того, поскольку поддержка многопоточности в стандартном С++ является довольно новой и немного инфантильной, вам иногда просто приходится возвращаться к специфичным для платформы механизмам, таким как, например, a futex
в системах Linux или критических разделах в окнах или нестандартных библиотеках, таких как Qt.
При этом я мог бы подумать о двух подходах к реализации, которые могут потенциально ускорить вашу программу:
SpinLock
Если конфликты доступа происходят очень редко, и мьютекс удерживается только на короткие промежутки времени (две вещи, к которым нужно стремиться, конечно же), может быть наиболее эффективным просто использовать спин-блокировку, так как она не требует какой-либо системы звонки вообще и его просто реализовать (взято из cppreference):
class SpinLock {
std::atomic_flag locked ;
public:
void lock() {
while (locked.test_and_set(std::memory_order_acquire)) {
std::this_thread::yield(); //<- this is not in the source but might improve performance.
}
}
void unlock() {
locked.clear(std::memory_order_release);
}
};
Конечно, недостатком является то, что ожидающие потоки не засыпают и не крадут время обработки.
Проверено блокирование
Это, по сути, идея, которую вы продемонстрировали: сначала сделайте быструю проверку, действительно ли требуется блокировка на основе операции атомного свопа и используйте тяжелый std::mutex
, только если это неизбежно.
struct FastMux {
//Status of the fast mutex
std::atomic<bool> locked;
//helper mutex and vc on which threads can wait in case of collision
std::mutex mux;
std::condition_variable cv;
//the maximum number of threads that might be waiting on the cv (conservative estimation)
std::atomic<int> cntr;
FastMux():locked(false), cntr(0){}
void lock() {
if (locked.exchange(true)) {
cntr++;
{
std::unique_lock<std::mutex> ul(mux);
cv.wait(ul, [&]{return !locked.exchange(true); });
}
cntr--;
}
}
void unlock() {
locked = false;
if (cntr > 0){
std::lock_guard<std::mutex> ul(mux);
cv.notify_one();
}
}
};
Обратите внимание, что std::mutex
не заблокирован между lock()
и unlock()
, но он используется только для обработки переменной условия. Это приводит к большему количеству вызовов блокировки/разблокировки, если на мьютексе имеется большая перегрузка.
Проблема с вашей реализацией заключается в том, что cv.notify_one();
можно потенциально вызывать между if(lockCounter.fetch_add(1, std::memory_order_acquire)>0)
и cv.wait(lock);
, чтобы ваш поток никогда не мог проснуться.
Я не проводил сравнение производительности с фиксированной версией предлагаемой вами реализации, поэтому вам просто нужно посмотреть, что лучше всего подходит для вас.
Ответ 2
На самом деле не ответ на определение, но в зависимости от конкретной задачи незаблокированная очередь может вообще избавиться от мьютекса. Это поможет дизайну, если у вас есть несколько производителей и один потребитель (или даже несколько потребителей). Ссылки:
- Хотя не напрямую С++/STL, Boost.Lockfree предоставляет такую очередь.
- Другим вариантом является реализация очереди без блокировки в "С++ Concurrency в действии" Энтони Уильямса.
- Быстрая блокировка для С++
Обновить ответы на комментарии:
Размер/переполнение очереди:
- Невозможно избежать переполнения очереди, если i) сделает очередь достаточно большой или ii), заставив поток продюсера ждать с нажатием данных после того, как очередь будет заполнена.
- Другой вариант - использовать несколько потребителей и несколько очередей и реализовать параллельное сокращение, но это зависит от того, как обрабатываются данные.
Потребительский поток:
- В очереди можно использовать
std::condition_variable
и заставить потребительский поток ждать, пока не появятся данные.
- Другим вариантом будет использование таймера для регулярной проверки (опроса) для очереди, не являющейся пустой, после того как она непустая, поток может непрерывно извлекать данные и возвращаться в режим ожидания.