Остановка С++ 11 std:: threads, ожидающие на std:: condition_variable
Я пытаюсь понять основные механизмы многопоточности в новом стандарте С++ 11. Самый простой пример, который я могу придумать, следующий:
- Производитель и потребитель реализуются в отдельных потоках.
- Производитель помещает определенное количество элементов внутри очереди
- Потребитель берет элементы из очереди, если есть какие-либо существующие
Этот пример также используется во многих школьных книгах о многопоточности, и все, что касается процесса коммуникации, отлично работает. Однако у меня есть проблема, когда дело доходит до остановки потребительского потока.
Я хочу, чтобы потребитель работал до тех пор, пока он не получил явный сигнал остановки (в большинстве случаев это означает, что я жду, пока продюсер закончит, чтобы я мог остановить пользователя до завершения программы). К сожалению, в потоках С++ 11 отсутствует механизм прерывания (который я знаю из многопоточности в Java, например). Таким образом, я должен использовать такие флаги, как isRunning
, чтобы сигнализировать, что я хочу остановить поток.
Основная проблема заключается в следующем: после того как я остановил поток производителя, очередь пуста и потребитель ждет на condition_variable
, чтобы получить сигнал, когда очередь будет заполнена снова. Поэтому мне нужно разбудить поток, вызвав notify_all()
в переменной перед выходом.
Я нашел рабочее решение, но оно кажется каким-то грязным.
Ниже приведен пример кода (извините, но каким-то образом я не смог уменьшить размер кода для любого минимального минимального примера):
Класс Queue:
class Queue{
public:
Queue() : m_isProgramStopped{ false } { }
void push(int i){
std::unique_lock<std::mutex> lock(m_mtx);
m_q.push(i);
m_cond.notify_one();
}
int pop(){
std::unique_lock<std::mutex> lock(m_mtx);
m_cond.wait(lock, [&](){ return !m_q.empty() || m_isProgramStopped; });
if (m_isProgramStopped){
throw std::exception("Program stopped!");
}
int x = m_q.front();
m_q.pop();
std::cout << "Thread " << std::this_thread::get_id() << " popped " << x << "." << std::endl;
return x;
}
void stop(){
m_isProgramStopped = true;
m_cond.notify_all();
}
private:
std::queue<int> m_q;
std::mutex m_mtx;
std::condition_variable m_cond;
bool m_isProgramStopped;
};
Продюсер:
class Producer{
public:
Producer(Queue & q) : m_q{ q }, m_counter{ 1 } { }
void produce(){
for (int i = 0; i < 5; i++){
m_q.push(m_counter++);
std::this_thread::sleep_for(std::chrono::milliseconds{ 500 });
}
}
void execute(){
m_t = std::thread(&Producer::produce, this);
}
void join(){
m_t.join();
}
private:
Queue & m_q;
std::thread m_t;
unsigned int m_counter;
};
Потребитель:
class Consumer{
public:
Consumer(Queue & q) : m_q{ q }, m_takeCounter{ 0 }, m_isRunning{ true }
{ }
~Consumer(){
std::cout << "KILL CONSUMER! - TOOK: " << m_takeCounter << "." << std::endl;
}
void consume(){
while (m_isRunning){
try{
m_q.pop();
m_takeCounter++;
}
catch (std::exception e){
std::cout << "Program was stopped while waiting." << std::endl;
}
}
}
void execute(){
m_t = std::thread(&Consumer::consume, this);
}
void join(){
m_t.join();
}
void stop(){
m_isRunning = false;
}
private:
Queue & m_q;
std::thread m_t;
unsigned int m_takeCounter;
bool m_isRunning;
};
И, наконец, main()
:
int main(void){
Queue q;
Consumer cons{ q };
Producer prod{ q };
cons.execute();
prod.execute();
prod.join();
cons.stop();
q.stop();
cons.join();
std::cout << "END" << std::endl;
return EXIT_SUCCESS;
}
Правильно ли это закончить поток, ожидающий переменную условия, или есть лучшие методы? В настоящее время очередь должна знать, остановлена ли программа (которая, на мой взгляд, уничтожает свободную связь компонентов), и мне нужно вызвать stop()
в очереди явно, что кажется неправильным.
Дополнительно, переменная условия, которая должна использоваться только как сингл, если очередь пуста, теперь означает другое условие - если программа закончилась. Если я не ошибаюсь, каждый раз, когда поток ожидает переменную условия для какого-либо события, он также должен будет проверить, нужно ли остановить поток, прежде чем продолжить его выполнение (что также кажется неправильным).
Есть ли у меня эти проблемы, потому что вся моя конструкция неисправна или мне не хватает некоторых механизмов, которые можно использовать для чистого потока потоков?
Ответы
Ответ 1
Нет, нет ничего плохого в вашем дизайне, и это нормальный подход, применяемый к этой проблеме.
Совершенно верно для вас иметь несколько условий (например, что-либо в очереди или остановке программы), привязанных к переменной условия. Главное, что биты в условии проверяются, когда возвращается wait
.
Вместо того, чтобы иметь флаг в Queue
, чтобы указать, что программа останавливается, вы должны думать о том, что флаг "я могу принять". Это лучшая общая парадигма и работает лучше в многопоточной среде.
Кроме того, вместо pop
вызывать исключение, если кто-то его вызывает и stop
был вызван, вы можете заменить метод bool try_pop(int &value)
, который вернет true
, если значение было возвращено, иначе false
, Таким образом, вызывающий может проверить, не удалось ли остановить очередь (добавьте метод bool is_stopped() const
). Хотя обработка исключений работает здесь, она немного тяжелая, и на самом деле это не исключительный случай в многопоточной программе.
Ответ 2
wait
можно вызвать с таймаутом. Элемент управления возвращается в поток, и stop
можно проверить. В зависимости от этого значения он может wait
на большее количество предметов, которые будут потребляться или закончить выполнение. Хорошим введением в многопоточность с С++ является С++ 11 Concurrency.