С++ 11 упорядочение атомной памяти - это правильное использование упорядоченного (освобождения-потребления) заказа?
Недавно я сделал порт для С++ 11, используя std:: atomic тройного буфера, который будет использоваться в качестве механизма синхронизации concurrency. Идея этого подхода синхронизации потоков заключается в том, что для ситуации с производителем-потребителем, когда у вас есть производитель, который работает быстрее, чем потребитель, тройная буферизация может дать некоторые преимущества, поскольку поток производителя не будет "замедлен", если придется ждать для потребителя. В моем случае у меня есть физический поток, который обновляется со скоростью ~ 120 кадров в секунду и поток рендеринга, который работает на скорости ~ 60 кадров в секунду. Очевидно, что я хочу, чтобы поток рендеринга всегда получал самое последнее состояние, но я также знаю, что я буду пропускать много кадров из физического потока из-за разницы в ставках. С другой стороны, я хочу, чтобы мой физический поток поддерживал постоянную скорость обновления и не ограничивался медленным потоком рендеринга, блокирующим мои данные.
Оригинальный код C был сделан remis-мыслями, и полное объяснение находится в его blog. Я призываю всех, кто интересуется его чтением, для дальнейшего понимания первоначальной реализации.
Моя реализация может быть найдена здесь.
Основная идея состоит в том, чтобы иметь массив с тремя позициями (буферами) и атомным флагом, который сравнивается и заменяется, чтобы определить, какие элементы массива соответствуют какому состоянию в любой момент времени. Таким образом, только одна атомная переменная используется для моделирования всех 3 индексов массива и логики тройной буферизации. Буферные 3 позиции называются Dirty, Clean и Snap. Производитель всегда пишет индекс Dirty и может перевернуть запись, чтобы поменять местами Dirty с текущим индексом Clean. Потребитель может запросить новую привязку, которая свопирует текущий индекс Snap с индексом Clean, чтобы получить самый последний буфер. Пользователь всегда считывает буфер в позиции привязки.
Флаг состоит из 8-битного без знака int, а биты соответствуют:
(не используется) (новая запись) (2x грязная) (2x чистая) (2x привязка)
Дополнительный бит бит newWrite устанавливается автором и очищается читателем. Читатель может использовать это, чтобы проверить, были ли какие-либо записи с момента последней привязки, и если нет, это не займет еще одну привязку. Флаг и индексы могут быть получены с помощью простых побитовых операций.
Теперь для кода:
template <typename T>
class TripleBuffer
{
public:
TripleBuffer<T>();
TripleBuffer<T>(const T& init);
// non-copyable behavior
TripleBuffer<T>(const TripleBuffer<T>&) = delete;
TripleBuffer<T>& operator=(const TripleBuffer<T>&) = delete;
T snap() const; // get the current snap to read
void write(const T newT); // write a new value
bool newSnap(); // swap to the latest value, if any
void flipWriter(); // flip writer positions dirty / clean
T readLast(); // wrapper to read the last available element (newSnap + snap)
void update(T newT); // wrapper to update with a new element (write + flipWriter)
private:
bool isNewWrite(uint_fast8_t flags); // check if the newWrite bit is 1
uint_fast8_t swapSnapWithClean(uint_fast8_t flags); // swap Snap and Clean indexes
uint_fast8_t newWriteSwapCleanWithDirty(uint_fast8_t flags); // set newWrite to 1 and swap Clean and Dirty indexes
// 8 bit flags are (unused) (new write) (2x dirty) (2x clean) (2x snap)
// newWrite = (flags & 0x40)
// dirtyIndex = (flags & 0x30) >> 4
// cleanIndex = (flags & 0xC) >> 2
// snapIndex = (flags & 0x3)
mutable atomic_uint_fast8_t flags;
T buffer[3];
};
реализация:
template <typename T>
TripleBuffer<T>::TripleBuffer(){
T dummy = T();
buffer[0] = dummy;
buffer[1] = dummy;
buffer[2] = dummy;
flags.store(0x6, std::memory_order_relaxed); // initially dirty = 0, clean = 1 and snap = 2
}
template <typename T>
TripleBuffer<T>::TripleBuffer(const T& init){
buffer[0] = init;
buffer[1] = init;
buffer[2] = init;
flags.store(0x6, std::memory_order_relaxed); // initially dirty = 0, clean = 1 and snap = 2
}
template <typename T>
T TripleBuffer<T>::snap() const{
return buffer[flags.load(std::memory_order_consume) & 0x3]; // read snap index
}
template <typename T>
void TripleBuffer<T>::write(const T newT){
buffer[(flags.load(std::memory_order_consume) & 0x30) >> 4] = newT; // write into dirty index
}
template <typename T>
bool TripleBuffer<T>::newSnap(){
uint_fast8_t flagsNow(flags.load(std::memory_order_consume));
do {
if( !isNewWrite(flagsNow) ) // nothing new, no need to swap
return false;
} while(!flags.compare_exchange_weak(flagsNow,
swapSnapWithClean(flagsNow),
memory_order_release,
memory_order_consume));
return true;
}
template <typename T>
void TripleBuffer<T>::flipWriter(){
uint_fast8_t flagsNow(flags.load(std::memory_order_consume));
while(!flags.compare_exchange_weak(flagsNow,
newWriteSwapCleanWithDirty(flagsNow),
memory_order_release,
memory_order_consume));
}
template <typename T>
T TripleBuffer<T>::readLast(){
newSnap(); // get most recent value
return snap(); // return it
}
template <typename T>
void TripleBuffer<T>::update(T newT){
write(newT); // write new value
flipWriter(); // change dirty/clean buffer positions for the next update
}
template <typename T>
bool TripleBuffer<T>::isNewWrite(uint_fast8_t flags){
// check if the newWrite bit is 1
return ((flags & 0x40) != 0);
}
template <typename T>
uint_fast8_t TripleBuffer<T>::swapSnapWithClean(uint_fast8_t flags){
// swap snap with clean
return (flags & 0x30) | ((flags & 0x3) << 2) | ((flags & 0xC) >> 2);
}
template <typename T>
uint_fast8_t TripleBuffer<T>::newWriteSwapCleanWithDirty(uint_fast8_t flags){
// set newWrite bit to 1 and swap clean with dirty
return 0x40 | ((flags & 0xC) << 2) | ((flags & 0x30) >> 2) | (flags & 0x3);
}
Как вы можете видеть, я решил использовать шаблон Release-Consume для упорядочения памяти.
Релиз (memory_order_release) для хранилища гарантирует, что никакие записи в текущем потоке не могут быть переупорядочены после магазина. С другой стороны, Consume гарантирует, что никакие чтения в текущем потоке, зависящие от загружаемого значения, могут быть переупорядочены до этой нагрузки. Это гарантирует, что записи в зависимые переменные в других потоках, выпускающих одну и ту же атомную переменную, будут видны в текущем потоке.
Если мое понимание правильное, так как мне нужны только те атомы, которые нужно установить атомом, операции с другими переменными, которые не влияют непосредственно на флаги, могут свободно переупорядочиваться компилятором, что позволяет увеличить оптимизацию. Из чтения некоторых документов по новой модели памяти я также знаю, что эти расслабленные атомы будут иметь заметное влияние только на такие платформы, как ARM и POWER (они были введены главным образом из-за них). Поскольку я нацелен на ARM, я считаю, что я мог бы извлечь выгоду из этих операций и уметь выжать немного больше производительности.
Теперь на вопрос:
Правильно ли я использую расслабленное упорядочение Release-Consume для этой конкретной проблемы?
Спасибо,
Андре
PS: Извините за длинный пост, но я полагал, что для лучшего обзора проблемы нужен какой-то достойный контекст.
РЕДАКТИРОВАТЬ:
Реализованные предложения @Yakk:
- Исправлено
flags
чтение на newSnap()
и flipWriter()
, которые использовали прямое назначение, поэтому использовали по умолчанию load(std::memory_order_seq_cst)
.
- Для ясности перемещены операции с чередованием бит на выделенные функции.
- Добавлен возвращаемый тип
bool
в newSnap()
, теперь возвращает false, если в противном случае ничего нового и не было.
- Определенный класс как не скопируемый с помощью
= delete
idiom, поскольку оба конструктора копирования и присваивания были небезопасными, если использовался TripleBuffer
.
ИЗМЕНИТЬ 2:Исправлено описание, которое было неверным (спасибо @Useless). Покупатель запрашивает новую привязку и считывает из индекса Snap (а не "писатель" ). Извините за отвлечение и спасибо Бесполезным за указание на это.
ИЗМЕНИТЬ 3:
Оптимизировал функции newSnap()
и flipriter()
в соответствии с предложениями @Display Name, эффективно удалив 2 избыточных load()
за цикл цикла.
Ответы
Ответ 1
Почему вы загружаете значение старых флагов дважды в циклы CAS? Первый раз - flags.load()
, а второй - compare_exchange_weak()
, который стандарт указывает на отказ CAS, загружает предыдущее значение в первый аргумент, который в этом случае является flagsNow.
Согласно http://en.cppreference.com/w/cpp/atomic/atomic/compare_exchange, "В противном случае загружает фактическое значение, хранящееся в * это в ожидаемое (выполняет операцию загрузки)." То, что делает ваш цикл, это то, что при ошибке compare_exchange_weak()
перезагружает flagsNow
, тогда цикл повторяется, и первый оператор загружает его еще раз, сразу после загрузки compare_exchange_weak()
. Мне кажется, что ваша петля должна вместо этого вытащить нагрузку за пределы петли. Например, newSnap()
будет:
uint_fast8_t flagsNow(flags.load(std::memory_order_consume));
do
{
if( !isNewWrite(flagsNow)) return false; // nothing new, no need to swap
} while(!flags.compare_exchange_weak(flagsNow, swapSnapWithClean(flagsNow), memory_order_release, memory_order_consume));
и flipWriter()
:
uint_fast8_t flagsNow(flags.load(std::memory_order_consume));
while(!flags.compare_exchange_weak(flagsNow, newWriteSwapCleanWithDirty(flagsNow), memory_order_release, memory_order_consume));
Ответ 2
Да, это разница между memory_order_acquire и memory_order_consume, но вы не заметите этого, когда используете его 180 раз в секунду. Вы можете запустить мой тест с m2 = memory_order_consume, если вы хотите узнать ответ в цифрах. Просто измените имя производителя_or_consumer_Thread на что-то вроде этого:
TripleBuffer <int> tb;
void producer_or_consumer_Thread(void *arg)
{
struct Arg * a = (struct Arg *) arg;
bool succeeded = false;
int i = 0, k, kold = -1, kcur;
while (a->run)
{
while (a->wait) a->is_waiting = true; // busy wait
if (a->producer)
{
i++;
tb.update(i);
a->counter[0]++;
}
else
{
kcur = tb.snap();
if (kold != -1 && kcur != kold) a->counter[1]++;
succeeded = tb0.newSnap();
if (succeeded)
{
k = tb.readLast();
if (kold == -1)
kold = k;
else if (kold = k + 1)
kold = k;
else
succeeded = false;
}
if (succeeded) a->counter[0]++;
}
}
a->is_waiting = true;
}
ИСПЫТАНИЕ Результат:
_#_ __Produced __Consumed _____Total
1 39258150 19509292 58767442
2 24598892 14730385 39329277
3 10615129 10016276 20631405
4 10617349 10026637 20643986
5 10600334 9976625 20576959
6 10624009 10069984 20693993
7 10609040 10016174 20625214
8 25864915 15136263 41001178
9 39847163 19809974 59657137
10 29981232 16139823 46121055
11 10555174 9870567 20425741
12 25975381 15171559 41146940
13 24311523 14490089 38801612
14 10512252 9686540 20198792
15 10520211 9693305 20213516
16 10523458 9720930 20244388
17 10576840 9917756 20494596
18 11048180 9528808 20576988
19 11500654 9530853 21031507
20 11264789 9746040 21010829