Ответ 1
Очень интересная проблема! Дорога сложнее, чем я сначала подумал:-) Мне нравятся свободные от блокировки решения, поэтому я попытался работать один из них ниже.
Есть много способов думать об этой системе. Вы можете моделировать это как циклический буфер/очередь фиксированного размера (с двумя записями), но затем вы теряете возможность обновлять следующее доступное значение для потребления, поскольку вы не знаете, начал ли потребитель читать недавно опубликованные значение или по-прежнему (потенциально) чтение предыдущего. Таким образом, дополнительное состояние необходимо за пределами стандартного кольцевого буфера, чтобы достичь более оптимального Решение.
Прежде всего, обратите внимание, что всегда есть ячейка, которую производитель может безопасно записать в в любой момент времени; если одна ячейка считывается потребителем, другие могут быть записаны. Позвоните в ячейку, которую можно безопасно записать в "активная" ячейка (ячейка, которая может быть потенциально прочитана, - это какая-либо ячейка не активный). Активная ячейка может быть переключена только в том случае, если другая ячейка не в настоящее время считывается с.
В отличие от активной ячейки, которую всегда можно записать, неактивная ячейка может только читать, если он содержит значение; как только это значение будет потреблено, оно исчезло. (Это означает, что в случае агрессивного производителя избегается ливня, а на некоторых точка, потребитель будет опорожнять ячейку и перестанет касаться клеток. однажды что происходит, производитель может определенно опубликовать значение, тогда как до этого момента, он может опубликовать только значение (изменить активную ячейку), если потребитель не находится в середина прочитанного.)
Если есть значение, которое готово к употреблению, только потребитель может изменить это факт (для неактивной ячейки, во всяком случае); последующие производства могут изменить, какая ячейка активна и опубликованная величина, но значение всегда будет готово к чтению до тех пор, пока он потреблял.
Как только производитель будет записан в активную ячейку, он может "опубликовать" это значение на изменение ячейки, являющейся активной (замена индекса), при условии, что потребитель а не в середине чтения другой ячейки. Если потребитель находится в середине читая другую ячейку, своп не может произойти, но в этом случае потребитель может обменять после того, как он будет читать значение, если производитель не находится в середине напишите (и если это так, то производитель поменяет его, как только это будет сделано). Фактически, в целом потребитель всегда может поменяться после его чтения (если это единственный доступ к системе), поскольку ложные свопы потребителя являются доброкачественными: если есть что-то в другой ячейке, тогда обмен будет заставлять это читать далее, и если там нет, замена меняет ничего.
Итак, нам нужна общая переменная для отслеживания активной ячейки. Нам также нужна как для производителя, так и для потребителя, чтобы указать, находятся ли они в середине операция. Мы можем хранить эти три части состояния в одной атомной переменной, чтобы чтобы иметь возможность воздействовать на них всех сразу (атомарно). Нам также нужен способ, чтобы потребитель мог проверить, есть ли что-либо в неактивная ячейка в первую очередь, и для обоих потоков для изменения этого состояния при необходимости. Я попробовал несколько других подходов, но, в конце концов, проще всего было просто включить эту информацию и в другую атомную переменную. Это делает многое проще рассуждать, поскольку все изменения состояния в системе являются атомарными таким образом.
Я придумал безжизненную реализацию (без блокировки, и все операции завершены в ограниченном числе инструкций).
Время кода!
#include <atomic>
#include <cstdint>
template <typename T>
class ProducerConsumerDoubleBuffer {
public:
ProducerConsumerDoubleBuffer() : m_state(0) { }
~ProducerConsumerDoubleBuffer() { }
// Never returns nullptr
T* start_writing() {
// Increment active users; once we do this, no one
// can swap the active cell on us until we're done
auto state = m_state.fetch_add(0x2, std::memory_order_relaxed);
return &m_buf[state & 1];
}
void end_writing() {
// We want to swap the active cell, but only if we were the last
// ones concurrently accessing the data (otherwise the consumer
// will do it for us when *it's* done accessing the data)
auto state = m_state.load(std::memory_order_relaxed);
std::uint32_t flag = (8 << (state & 1)) ^ (state & (8 << (state & 1)));
state = m_state.fetch_add(flag - 0x2, std::memory_order_release) + flag - 0x2;
if ((state & 0x6) == 0) {
// The consumer wasn't in the middle of a read, we should
// swap (unless the consumer has since started a read or
// already swapped or read a value and is about to swap).
// If we swap, we also want to clear the full flag on what
// will become the active cell, otherwise the consumer could
// eventually read two values out of order (it reads a new
// value, then swaps and reads the old value while the
// producer is idle).
m_state.compare_exchange_strong(state, (state ^ 0x1) & ~(0x10 >> (state & 1)), std::memory_order_release);
}
}
// Returns nullptr if there appears to be no more data to read yet
T* start_reading() {
m_readState = m_state.load(std::memory_order_relaxed);
if ((m_readState & (0x10 >> (m_readState & 1))) == 0) {
// Nothing to read here!
return nullptr;
}
// At this point, there is guaranteed to be something to
// read, because the full flag is never turned off by the
// producer thread once it on; the only thing that could
// happen is that the active cell changes, but that can
// only happen after the producer wrote a value into it,
// in which case there still a value to read, just in a
// different cell.
m_readState = m_state.fetch_add(0x2, std::memory_order_acquire) + 0x2;
// Now that we've incremented the user count, nobody can swap until
// we decrement it
return &m_buf[(m_readState & 1) ^ 1];
}
void end_reading() {
if ((m_readState & (0x10 >> (m_readState & 1))) == 0) {
// There was nothing to read; shame to repeat this
// check, but if these functions are inlined it might
// not matter. Otherwise the API could be changed.
// Or just don't call this method if start_reading()
// returns nullptr -- then you could also get rid
// of m_readState.
return;
}
// Alright, at this point the active cell cannot change on
// us, but the active cell flag could change and the user
// count could change. We want to release our user count
// and remove the flag on the value we read.
auto state = m_state.load(std::memory_order_relaxed);
std::uint32_t sub = (0x10 >> (state & 1)) | 0x2;
state = m_state.fetch_sub(sub, std::memory_order_relaxed) - sub;
if ((state & 0x6) == 0 && (state & (0x8 << (state & 1))) == 1) {
// Oi, we were the last ones accessing the data when we released our cell.
// That means we should swap, but only if the producer isn't in the middle
// of producing something, and hasn't already swapped, and hasn't already
// set the flag we just reset (which would mean they swapped an even number
// of times). Note that we don't bother swapping if there nothing to read
// in the other cell.
m_state.compare_exchange_strong(state, state ^ 0x1, std::memory_order_relaxed);
}
}
private:
T m_buf[2];
// The bottom (lowest) bit will be the active cell (the one for writing).
// The active cell can only be switched if there at most one concurrent
// user. The next two bits of state will be the number of concurrent users.
// The fourth bit indicates if there a value available for reading
// in m_buf[0], and the fifth bit has the same meaning but for m_buf[1].
std::atomic<std::uint32_t> m_state;
std::uint32_t m_readState;
};
Обратите внимание, что семантика такова, что потребитель никогда не может прочитать данное значение дважды, и значение, которое оно читает, всегда новее, чем последнее прочитанное значение. Это также справедливо эффективный в использовании памяти (два буфера, например, ваше исходное решение). Я избегал циклов CAS потому что они обычно менее эффективны, чем одна атомная операция в конфликте.
Если вы решите использовать вышеуказанный код, я предлагаю вам сначала написать для него полные (поточные) модульные тесты. И правильные тесты. Я проверил его, но только чуть-чуть. Дайте мне знать, если вы найдете ошибки: -)
Мой unit test:
ProducerConsumerDoubleBuffer<int> buf;
std::thread producer([&]() {
for (int i = 0; i != 500000; ++i) {
int* item = buf.start_writing();
if (item != nullptr) { // Always true
*item = i;
}
buf.end_writing();
}
});
std::thread consumer([&]() {
int prev = -1;
for (int i = 0; i != 500000; ++i) {
int* item = buf.start_reading();
if (item != nullptr) {
assert(*item > prev);
prev = *item;
}
buf.end_reading();
}
});
producer.join();
consumer.join();
Что касается вашей первоначальной реализации, я только смотрел на нее с легкостью (это гораздо интереснее дизайн нового материала, хех), но ответ david.pfx, похоже, затрагивает ту часть вашего вопроса.