С++ эквивалентно Java BlockingQueue
Я переношу некоторый код Java на С++, и в одном конкретном разделе используется BlockingQueue для передачи сообщений от многих производителей одному потребителю.
Если вы не знакомы с тем, что такое Java BlockingQueue, это просто очередь, которая имеет жесткую емкость, которая предоставляет потокобезопасные методы для put() и take() из очереди. put(), если очередь заполнена, и принимать() блоки, если очередь пуста. Кроме того, поставляются таймерные версии этих методов.
Тайм-ауты имеют отношение к моему прецеденту, поэтому рекомендация, которая поставляет их, идеальна. Если нет, я могу сам составить код.
Я googled вокруг и быстро просматривал библиотеки Boost, и я не нахожу ничего подобного. Может быть, я слепой здесь... но кто-нибудь знает хорошие рекомендации?
Спасибо!
Ответы
Ответ 1
Это не фиксированный размер и он не поддерживает тайм-ауты, но вот простая реализация очереди, которую я недавно опубликовал с использованием конструкций С++ 2011:
#include <mutex>
#include <condition_variable>
#include <deque>
template <typename T>
class queue
{
private:
std::mutex d_mutex;
std::condition_variable d_condition;
std::deque<T> d_queue;
public:
void push(T const& value) {
{
std::unique_lock<std::mutex> lock(this->d_mutex);
d_queue.push_front(value);
}
this->d_condition.notify_one();
}
T pop() {
std::unique_lock<std::mutex> lock(this->d_mutex);
this->d_condition.wait(lock, [=]{ return !this->d_queue.empty(); });
T rc(std::move(this->d_queue.back()));
this->d_queue.pop_back();
return rc;
}
};
Должно быть тривиально расширять и использовать время ожидания для всплытия. Основная причина, по которой я этого не сделал, - это то, что я не доволен выборами интерфейса, о которых я думал до сих пор.
Ответ 2
Вот пример очереди блокировки с функцией запроса на отключение :
template <typename T> class BlockingQueue {
std::condition_variable _cvCanPop;
std::mutex _sync;
std::queue<T> _qu;
bool _bShutdown = false;
public:
void Push(const T& item)
{
{
std::unique_lock<std::mutex> lock(_sync);
_qu.push(item);
}
_cvCanPop.notify_one();
}
void RequestShutdown() {
{
std::unique_lock<std::mutex> lock(_sync);
_bShutdown = true;
}
_cvCanPop.notify_all();
}
bool Pop(T &item) {
std::unique_lock<std::mutex> lock(_sync);
for (;;) {
if (_qu.empty()) {
if (_bShutdown) {
return false;
}
}
else {
break;
}
_cvCanPop.wait(lock);
}
item = std::move(_qu.front());
_qu.pop();
return true;
}
};
Ответ 3
Хорошо, я немного опоздал на вечеринку, но я думаю, что это лучше подходит для реализации Java BlockingQueue
. Здесь я тоже использую один мьютекс и два условия, чтобы присматривать за не полностью и не пусто. IMO a BlockingQueue
имеет больше смысла с ограниченными возможностями, которых я не видел в других ответах. Я также включил простой тестовый сценарий:
#include <iostream>
#include <algorithm>
#include <queue>
#include <mutex>
#include <thread>
#include <condition_variable>
template<typename T>
class blocking_queue {
private:
size_t _capacity;
std::queue<T> _queue;
std::mutex _mutex;
std::condition_variable _not_full;
std::condition_variable _not_empty;
public:
inline blocking_queue(size_t capacity) : _capacity(capacity) {
// empty
}
inline size_t size() const {
std::unique_lock<std::mutex> lock(_mutex);
return _queue.size();
}
inline bool empty() const {
std::unique_lock<std::mutex> lock(_mutex);
return _queue.empty();
}
inline void push(const T& elem) {
{
std::unique_lock<std::mutex> lock(_mutex);
// wait while the queue is full
while (_queue.size() >= _capacity) {
_not_full.wait(lock);
}
std::cout << "pushing element " << elem << std::endl;
_queue.push(elem);
}
_not_empty.notify_all();
}
inline void pop() {
{
std::unique_lock<std::mutex> lock(_mutex);
// wait while the queue is empty
while (_queue.size() == 0) {
_not_empty.wait(lock);
}
std::cout << "popping element " << _queue.front() << std::endl;
_queue.pop();
}
_not_full.notify_one();
}
inline const T& front() {
std::unique_lock<std::mutex> lock(_mutex);
// wait while the queue is empty
while (_queue.size() == 0) {
_not_empty.wait(lock);
}
return _queue.front();
}
};
int main() {
blocking_queue<int> queue(5);
// create producers
std::vector<std::thread> producers;
for (int i = 0; i < 10; i++) {
producers.push_back(std::thread([&queue, i]() {
queue.push(i);
// produces too fast
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}));
}
// create consumers
std::vector<std::thread> consumers;
for (int i = 0; i < 10; i++) {
producers.push_back(std::thread([&queue, i]() {
queue.pop();
// consumes too slowly
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
}));
}
std::for_each(producers.begin(), producers.end(), [](std::thread &thread) {
thread.join();
});
std::for_each(consumers.begin(), consumers.end(), [](std::thread &thread) {
thread.join();
});
return EXIT_SUCCESS;
}