Boost asio async_write: как не чередовать вызовы async_write?
Здесь моя реализация:
- Клиент A отправить сообщение для клиента B
- Сервер обрабатывает сообщение
async_read
нужным количеством данных и
будет ждать новых данных от клиента A (в порядке, чтобы не блокировать клиента A)
- После этого сервер обработает информацию (возможно, mysql
запрос), а затем отправить сообщение клиенту B с помощью
async_write
.
Проблема в том, что если сообщение "A" посылает сообщение очень быстро, async_writes
будет чередоваться до вызова предыдущего обработчика async_write.
Есть ли простой способ избежать этой проблемы?
ИЗМЕНИТЬ 1:
Если клиент C отправляет сообщение клиенту B сразу после клиента A, должна появиться та же проблема...
ИЗМЕНИТЬ 2:
Это сработает? потому что он, кажется, блокируется, я не знаю, где...
namespace structure {
class User {
public:
User(boost::asio::io_service& io_service, boost::asio::ssl::context& context) :
m_socket(io_service, context), m_strand(io_service), is_writing(false) {}
ssl_socket& getSocket() {
return m_socket;
}
boost::asio::strand getStrand() {
return m_strand;
}
void push(std::string str) {
m_strand.post(boost::bind(&structure::User::strand_push, this, str));
}
void strand_push(std::string str) {
std::cout << "pushing: " << boost::this_thread::get_id() << std::endl;
m_queue.push(str);
if (!is_writing) {
write();
std::cout << "going to write" << std::endl;
}
std::cout << "Already writing" << std::endl;
}
void write() {
std::cout << "writing" << std::endl;
is_writing = true;
std::string str = m_queue.front();
boost::asio::async_write(m_socket,
boost::asio::buffer(str.c_str(), str.size()),
boost::bind(&structure::User::sent, this)
);
}
void sent() {
std::cout << "sent" << std::endl;
m_queue.pop();
if (!m_queue.empty()) {
write();
return;
}
else
is_writing = false;
std::cout << "done sent" << std::endl;
}
private:
ssl_socket m_socket;
boost::asio::strand m_strand;
std::queue<std::string> m_queue;
bool is_writing;
};
}
#endif
Ответы
Ответ 1
Есть ли простой способ избежать этой проблемы?
Да, поддерживайте исходящую очередь для каждого клиента. Проверьте размер очереди в обработчике завершения async_write
, если ненулевое значение, запустите другую операцию async_write
. Вот пример
#include <boost/asio.hpp>
#include <boost/bind.hpp>
#include <deque>
#include <iostream>
#include <string>
class Connection
{
public:
Connection(
boost::asio::io_service& io_service
) :
_io_service( io_service ),
_strand( _io_service ),
_socket( _io_service ),
_outbox()
{
}
void write(
const std::string& message
)
{
_strand.post(
boost::bind(
&Connection::writeImpl,
this,
message
)
);
}
private:
void writeImpl(
const std::string& message
)
{
_outbox.push_back( message );
if ( _outbox.size() > 1 ) {
// outstanding async_write
return;
}
this->write();
}
void write()
{
const std::string& message = _outbox[0];
boost::asio::async_write(
_socket,
boost::asio::buffer( message.c_str(), message.size() ),
_strand.wrap(
boost::bind(
&Connection::writeHandler,
this,
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred
)
)
);
}
void writeHandler(
const boost::system::error_code& error,
const size_t bytesTransferred
)
{
_outbox.pop_front();
if ( error ) {
std::cerr << "could not write: " << boost::system::system_error(error).what() << std::endl;
return;
}
if ( !_outbox.empty() ) {
// more messages to send
this->write();
}
}
private:
typedef std::deque<std::string> Outbox;
private:
boost::asio::io_service& _io_service;
boost::asio::io_service::strand _strand;
boost::asio::ip::tcp::socket _socket;
Outbox _outbox;
};
int
main()
{
boost::asio::io_service io_service;
Connection foo( io_service );
}
некоторые ключевые моменты
-
boost::asio::io_service::strand
защищает доступ к Connection::_outbox
- обработчик отправляется из
Connection::write()
, поскольку он является общедоступным
для меня не было очевидным, если вы использовали аналогичные методы в примере в своем вопросе, поскольку все методы являются общедоступными.
Ответ 2
Просто пытаюсь улучшить Сэм отличный ответ. Точками улучшения являются:
-
async_write
пытается выполнить отправку каждого байта из буфера (ов) перед завершением, что означает, что вы должны предоставить все входные данные, которые у вас есть, для операции записи, иначе служебные данные кадрирования могут увеличиваться из-за того, что пакеты TCP меньше, чем они могли бы быть.
-
asio::streambuf
, будучи очень удобным в использовании, не является нулевой копией. В приведенном ниже примере демонстрируется подход с нулевой копией: сохраняйте куски входных данных там, где они есть, и используйте перегрузку/собирать перегрузку async_write
, которая принимает последовательность входных буферов (которые являются только указателями на фактические входные данные).
Полный исходный код:
#include <boost/asio.hpp>
#include <iostream>
#include <memory>
#include <mutex>
#include <string>
#include <thread>
#include <unordered_set>
#include <vector>
using namespace std::chrono_literals;
using boost::asio::ip::tcp;
class Server
{
class Connection : public std::enable_shared_from_this<Connection>
{
friend class Server;
void ProcessCommand(const std::string& cmd) {
if (cmd == "stop") {
server_.Stop();
return;
}
if (cmd == "") {
Close();
return;
}
std::thread t([this, self = shared_from_this(), cmd] {
for (int i = 0; i < 30; ++i) {
Write("Hello, " + cmd + " " + std::to_string(i) + "\r\n");
}
server_.io_service_.post([this, self] {
DoReadCmd();
});
});
t.detach();
}
void DoReadCmd() {
read_timer_.expires_from_now(server_.read_timeout_);
read_timer_.async_wait([this](boost::system::error_code ec) {
if (!ec) {
std::cout << "Read timeout\n";
Shutdown();
}
});
boost::asio::async_read_until(socket_, buf_in_, '\n', [this, self = shared_from_this()](boost::system::error_code ec, std::size_t bytes_read) {
read_timer_.cancel();
if (!ec) {
const char* p = boost::asio::buffer_cast<const char*>(buf_in_.data());
std::string cmd(p, bytes_read - (bytes_read > 1 && p[bytes_read - 2] == '\r' ? 2 : 1));
buf_in_.consume(bytes_read);
ProcessCommand(cmd);
}
else {
Close();
}
});
}
void DoWrite() {
active_buffer_ ^= 1; // switch buffers
for (const auto& data : buffers_[active_buffer_]) {
buffer_seq_.push_back(boost::asio::buffer(data));
}
write_timer_.expires_from_now(server_.write_timeout_);
write_timer_.async_wait([this](boost::system::error_code ec) {
if (!ec) {
std::cout << "Write timeout\n";
Shutdown();
}
});
boost::asio::async_write(socket_, buffer_seq_, [this, self = shared_from_this()](const boost::system::error_code& ec, size_t bytes_transferred) {
write_timer_.cancel();
std::lock_guard<std::mutex> lock(buffers_mtx_);
buffers_[active_buffer_].clear();
buffer_seq_.clear();
if (!ec) {
std::cout << "Wrote " << bytes_transferred << " bytes\n";
if (!buffers_[active_buffer_ ^ 1].empty()) // have more work
DoWrite();
}
else {
Close();
}
});
}
bool Writing() const { return !buffer_seq_.empty(); }
Server& server_;
boost::asio::streambuf buf_in_;
std::mutex buffers_mtx_;
std::vector<std::string> buffers_[2]; // a double buffer
std::vector<boost::asio::const_buffer> buffer_seq_;
int active_buffer_ = 0;
bool closing_ = false;
bool closed_ = false;
boost::asio::deadline_timer read_timer_, write_timer_;
tcp::socket socket_;
public:
Connection(Server& server) : server_(server), read_timer_(server.io_service_), write_timer_(server.io_service_), socket_(server.io_service_) {
}
void Start() {
socket_.set_option(tcp::no_delay(true));
DoReadCmd();
}
void Close() {
closing_ = true;
if (!Writing())
Shutdown();
}
void Shutdown() {
if (!closed_) {
closing_ = closed_ = true;
boost::system::error_code ec;
socket_.shutdown(tcp::socket::shutdown_both, ec);
socket_.close();
server_.active_connections_.erase(shared_from_this());
}
}
void Write(std::string&& data) {
std::lock_guard<std::mutex> lock(buffers_mtx_);
buffers_[active_buffer_ ^ 1].push_back(std::move(data)); // move input data to the inactive buffer
if (!Writing())
DoWrite();
}
};
void DoAccept() {
if (acceptor_.is_open()) {
auto session = std::make_shared<Connection>(*this);
acceptor_.async_accept(session->socket_, [this, session](boost::system::error_code ec) {
if (!ec) {
active_connections_.insert(session);
session->Start();
}
DoAccept();
});
}
}
boost::asio::io_service io_service_;
tcp::acceptor acceptor_;
std::unordered_set<std::shared_ptr<Connection>> active_connections_;
const boost::posix_time::time_duration read_timeout_ = boost::posix_time::seconds(30);
const boost::posix_time::time_duration write_timeout_ = boost::posix_time::seconds(30);
public:
Server(int port) : acceptor_(io_service_, tcp::endpoint(tcp::v6(), port), false) { }
void Run() {
std::cout << "Listening on " << acceptor_.local_endpoint() << "\n";
DoAccept();
io_service_.run();
}
void Stop() {
acceptor_.close();
{
std::vector<std::shared_ptr<Connection>> sessionsToClose;
copy(active_connections_.begin(), active_connections_.end(), back_inserter(sessionsToClose));
for (auto& s : sessionsToClose)
s->Shutdown();
}
active_connections_.clear();
io_service_.stop();
}
};
int main() {
try {
Server srv(8888);
srv.Run();
}
catch (const std::exception& e) {
std::cerr << "Error: " << e.what() << "\n";
}
}