С++ 11 потоковая безопасность
Проект, над которым я работаю, использует несколько потоков для работы с коллекцией файлов. Каждый поток может добавлять файлы в список файлов, которые нужно обработать, поэтому я собрал (как я думал) потокобезопасную очередь. Соответствующие части следуют:
// qMutex is a std::mutex intended to guard the queue
// populatedNotifier is a std::condition_variable intended to
// notify waiting threads of a new item in the queue
void FileQueue::enqueue(std::string&& filename)
{
std::lock_guard<std::mutex> lock(qMutex);
q.push(std::move(filename));
// Notify anyone waiting for additional files that more have arrived
populatedNotifier.notify_one();
}
std::string FileQueue::dequeue(const std::chrono::milliseconds& timeout)
{
std::unique_lock<std::mutex> lock(qMutex);
if (q.empty()) {
if (populatedNotifier.wait_for(lock, timeout) == std::cv_status::no_timeout) {
std::string ret = q.front();
q.pop();
return ret;
}
else {
return std::string();
}
}
else {
std::string ret = q.front();
q.pop();
return ret;
}
}
Тем не менее, я иногда сталкиваюсь внутри блока if (...wait_for(lock, timeout) == std::cv_status::no_timeout) { }
, и проверка в gdb указывает, что segfaults происходят, потому что очередь пуста. Как это возможно? Полагаю, что wait_for
возвращает cv_status::no_timeout
только когда он был уведомлен, и это должно произойти только после того, как FileQueue::enqueue
просто нажал новый элемент в очередь.
Ответы
Ответ 1
В соответствии со стандартом condition_variables
разрешено просыпаться ложно, даже если событие не произошло. В случае ложного пробуждения он вернет cv_status::no_timeout
(поскольку он проснулся, а не тайм-аут), хотя он не был уведомлен. Правильное решение для этого, конечно, должно проверить, действительно ли пробуждение было законным, прежде чем обрабатывать.
Подробности указаны в стандарте §30.5.1 [thread.condition.condvar]:
- Функция будет разблокирована, когда будет сообщено о вызове notify_one(), вызове notify_all(), истечении абсолютного таймаута (30.2.4), заданного abs_time, или ложно.
...
Возвращает: cv_status:: timeout, если истек абсолютный тайм-аут (30.2.4), указанный в abs_time, other-ise cv_status:: no_timeout.
Ответ 2
Просто посмотрев на это, когда вы проверяете переменную условия, лучше использовать цикл while (так что если он просыпается и все еще недействителен, вы снова проверяете). Я просто написал шаблон для очереди async, надеюсь, что это поможет.
#ifndef SAFE_QUEUE
#define SAFE_QUEUE
#include <queue>
#include <mutex>
#include <condition_variable>
// A threadsafe-queue.
template <class T>
class SafeQueue
{
public:
SafeQueue(void)
: q()
, m()
, c()
{}
~SafeQueue(void)
{}
// Add an element to the queue.
void enqueue(T t)
{
std::lock_guard<std::mutex> lock(m);
q.push(t);
c.notify_one();
}
// Get the "front"-element.
// If the queue is empty, wait till a element is avaiable.
T dequeue(void)
{
std::unique_lock<std::mutex> lock(m);
while(q.empty())
{
// release lock as long as the wait and reaquire it afterwards.
c.wait(lock);
}
T val = q.front();
q.pop();
return val;
}
private:
std::queue<T> q;
mutable std::mutex m;
std::condition_variable c;
};
#endif
Ответ 3
Вероятно, вы должны это сделать:
void push(std::string&& filename)
{
{
std::lock_guard<std::mutex> lock(qMutex);
q.push(std::move(filename));
}
populatedNotifier.notify_one();
}
bool try_pop(std::string& filename, std::chrono::milliseconds timeout)
{
std::unique_lock<std::mutex> lock(qMutex);
if(!populatedNotifier.wait_for(lock, timeout, [this] { return !q.empty(); }))
return false;
filename = std::move(q.front());
q.pop();
return true;
}
Ответ 4
Добавляя к принятому ответу, я бы сказал, что реализация правильной очереди нескольких производителей/нескольких потребителей сложна (проще, поскольку С++ 11).
Я бы посоветовал вам попробовать (очень хорошую) блокировку бесплатной библиотеки ускорения, структура "очереди" будет делать то, что вы хотите, без ожидания/блокировки -бесплатные гарантии и без использования компилятора С++ 11.
Я добавляю этот ответ сейчас, потому что библиотека, свободная от блокировки, совершенно новая для повышения (начиная с 1.53, я считаю)
Ответ 5
Я бы переписал вашу функцию dequeue как:
std::string FileQueue::dequeue(const std::chrono::milliseconds& timeout)
{
std::unique_lock<std::mutex> lock(qMutex);
while(q.empty()) {
if (populatedNotifier.wait_for(lock, timeout) == std::cv_status::timeout )
return std::string();
}
std::string ret = q.front();
q.pop();
return ret;
}
Он короче и не имеет повторяющегося кода, подобного вашему. Только выпуск может подождать дольше этого таймаута. Чтобы предотвратить то, что вам нужно будет запомнить время начала до цикла, проверьте тайм-аут и отрегулируйте время ожидания соответственно. Или укажите абсолютное время в состоянии ожидания.
Ответ 6
Я некоторое время назад сталкивался с одной и той же проблемой, а затем писал в С++ 11 шаблон GNU thread-safe-asynchronous-queue. Я опубликовал в своем блоге:
http://gnodebian.blogspot.com.es/2013/07/a-thread-safe-asynchronous-queue-in-c11.html
Ответ 7
В этом случае также есть решение GLib, я еще не пробовал, но считаю, что это хорошее решение.
https://developer.gnome.org/glib/2.36/glib-Asynchronous-Queues.html#g-async-queue-new