Ответ 1
Этот пост в списке рассылки описывает ту же проблему. Хотя CreateFile
с FILE_FLAG_OVERLAPPED
допускает асинхронный ввод-вывод, он не устанавливает его как поток в контексте Boost.Asio. Для потоков Boost.Asio реализует read_some
как read_some_at
со смещением всегда 0
. Это источник проблемы, так как документация ReadFile()
гласит:
Для файлов, поддерживающих смещения байтов, вы должны указать смещение байта, с которого нужно начать чтение из файла.
Адаптация к требованиям типа
Boost.Asio написано очень в целом, часто требуя, чтобы аргументы удовлетворяли определенному требованию типа, а не определенному типу. Поэтому часто можно адаптировать объект ввода-вывода или его службу для получения желаемого поведения. Во-первых, необходимо определить, что должен поддерживать адаптированный интерфейс. В этом случае async_read_until
принимает любой тип, удовлетворяющий требованиям типа AsyncReadStream
. Требования AsyncReadStream
достаточно просты, требуя функцию члена void async_read_some(MutableBufferSequence, ReadHandler)
.
В качестве значения смещения необходимо будет отслеживать всю выполняемую операцию async_read_until
, можно ввести простой тип, отвечающий требованиям ReadHandler, который будет обертывать приложение ReadHandler и соответствующим образом обновлять смещение.
namespace detail {
/// @brief Handler to wrap asynchronous read_some_at operations.
template <typename Handler>
class read_some_offset_handler
{
public:
read_some_offset_handler(Handler handler, boost::uint64_t& offset)
: handler_(handler),
offset_(offset)
{}
void operator()(
const boost::system::error_code& error,
std::size_t bytes_transferred)
{
offset_ += bytes_transferred;
// If bytes were transferred, then set the error code as success.
// EOF will be detected on next read. This is to account for
// the read_until algorithm behavior.
const boost::system::error_code result_ec =
(error && bytes_transferred)
? make_error_code(boost::system::errc::success) : error;
handler_(result_ec, bytes_transferred);
}
//private:
Handler handler_;
boost::uint64_t& offset_;
};
/// @brief Hook that allows the wrapped handler to be invoked
/// within specific context. This is critical to support
/// composed operations being invoked within a strand.
template <typename Function,
typename Handler>
void asio_handler_invoke(
Function function,
detail::read_some_offset_handler<Handler>* handler)
{
boost_asio_handler_invoke_helpers::invoke(
function, handler->handler_);
}
} // namespace detail
Квест asio_handler_invoke
будет найден через ADL для поддержки вызова обработчиков пользователей в соответствующем контексте. Это важно для безопасности протектора, когда вызывается сложная операция в strand
. Подробнее о составных операциях и цепочках см. В этом.
Следующий класс будет адаптировать boost::asio::windows::random_access_handle
для соответствия требованиям типа AsyncReadStream
.
/// @brief Adapts AsyncRandomAccessReadDevice to support AsyncReadStream.
template <typename AsyncRandomAccessReadDevice>
class basic_adapted_stream
: public AsyncRandomAccessReadDevice
{
public:
basic_adapted_stream(
boost::asio::io_service& io_service,
HANDLE handle
)
: AsyncRandomAccessReadDevice(io_service, handle),
offset_(0)
{}
template<typename MutableBufferSequence,
typename ReadHandler>
void async_read_some(
const MutableBufferSequence& buffers,
ReadHandler handler)
{
async_read_at(*this, offset_, buffers,
detail::read_some_offset_handler<ReadHandler>(handler, offset_));
}
private:
boost::uint64_t offset_;
};
В качестве альтернативы boost::asio::windows::basic_stream_handle
может быть предоставлен настраиваемый тип, отвечающий требованиям StreamHandleService и реализовать async_read_some
в терминах async_read_some_at
.
/// @brief Service that implements async_read_some with async_read_some_at.
class offset_stream_handle_service
: public boost::asio::windows::stream_handle_service
{
private:
// The type of the platform-specific implementation.
typedef boost::asio::detail::win_iocp_handle_service service_impl_type;
public:
/// The unique service identifier.
static boost::asio::io_service::id id;
/// Construct a new stream handle service for the specified io_service.
explicit offset_stream_handle_service(boost::asio::io_service& io_service)
: boost::asio::windows::stream_handle_service(io_service),
service_impl_(io_service),
offset_(0)
{}
/// Start an asynchronous read.
template <typename MutableBufferSequence,
typename ReadHandler>
void
async_read_some(
implementation_type& impl,
const MutableBufferSequence& buffers,
ReadHandler handler)
{
// Implement async_read_some in terms of async_read_some_at. The provided
// ReadHandler will be hoisted in an internal handler so that offset_ can
// be properly updated.
service_impl_.async_read_some_at(impl, offset_, buffers,
detail::read_some_offset_handler<ReadHandler>(handler, offset_));
}
private:
// The platform-specific implementation.
service_impl_type service_impl_;
boost::uint64_t offset_;
};
boost::asio::io_service::id offset_stream_handle_service::id;
Я выбрал простоту в примере кода, но одна и та же услуга будет использоваться несколькими объектами ввода-вывода. Таким образом, offset_stream_handle_service
должен будет управлять смещением на каждого обработчика, чтобы он работал правильно, когда несколько объектов ввода-вывода используют службу.
Чтобы использовать адаптированные типы, измените переменную-член AsyncReader::input_handle
как либо basic_adapted_stream<boost::asio::windows::random_access_handle>
(адаптированный объект ввода-вывода), либо boost::asio::windows::basic_stream_handle<offset_stream_handle_service>
(адаптированная услуга).
Пример
Вот полный пример, основанный на исходном коде, только изменяющий тип AsyncReader::input_handler
:
#include "stdafx.h"
#include <cassert>
#include <iostream>
#include <string>
#include <boost/asio.hpp>
#include <boost/bind.hpp>
#include <Windows.h>
#include <WinBase.h>
namespace detail {
/// @brief Handler to wrap asynchronous read_some_at operations.
template <typename Handler>
class read_some_offset_handler
{
public:
read_some_offset_handler(Handler handler, boost::uint64_t& offset)
: handler_(handler),
offset_(offset)
{}
void operator()(
const boost::system::error_code& error,
std::size_t bytes_transferred)
{
offset_ += bytes_transferred;
// If bytes were transferred, then set the error code as success.
// EOF will be detected on next read. This is to account for
// the read_until algorithm behavior.
const boost::system::error_code result_ec =
(error && bytes_transferred)
? make_error_code(boost::system::errc::success) : error;
handler_(result_ec, bytes_transferred);
}
//private:
Handler handler_;
boost::uint64_t& offset_;
};
/// @brief Hook that allows the wrapped handler to be invoked
/// within specific context. This is critical to support
/// composed operations being invoked within a strand.
template <typename Function,
typename Handler>
void asio_handler_invoke(
Function function,
detail::read_some_offset_handler<Handler>* handler)
{
boost_asio_handler_invoke_helpers::invoke(
function, handler->handler_);
}
} // namespace detail
/// @brief Adapts AsyncRandomAccessReadDevice to support AsyncReadStream.
template <typename AsyncRandomAccessReadDevice>
class basic_adapted_stream
: public AsyncRandomAccessReadDevice
{
public:
basic_adapted_stream(
boost::asio::io_service& io_service,
HANDLE handle
)
: AsyncRandomAccessReadDevice(io_service, handle),
offset_(0)
{}
template<typename MutableBufferSequence,
typename ReadHandler>
void async_read_some(
const MutableBufferSequence& buffers,
ReadHandler handler)
{
async_read_at(*this, offset_, buffers,
detail::read_some_offset_handler<ReadHandler>(handler, offset_));
}
private:
boost::uint64_t offset_;
};
/// @brief Service that implements async_read_some with async_read_some_at.
class offset_stream_handle_service
: public boost::asio::windows::stream_handle_service
{
private:
// The type of the platform-specific implementation.
typedef boost::asio::detail::win_iocp_handle_service service_impl_type;
public:
/// The unique service identifier.
static boost::asio::io_service::id id;
/// Construct a new stream handle service for the specified io_service.
explicit offset_stream_handle_service(boost::asio::io_service& io_service)
: boost::asio::windows::stream_handle_service(io_service),
service_impl_(io_service),
offset_(0)
{}
/// Start an asynchronous read.
template <typename MutableBufferSequence,
typename ReadHandler>
void
async_read_some(
implementation_type& impl,
const MutableBufferSequence& buffers,
ReadHandler handler)
{
// Implement async_read_some in terms of async_read_some_at. The provided
// ReadHandler will be hoisted in an internal handler so that offset_ can
// be properly updated.
service_impl_.async_read_some_at(impl, offset_, buffers,
detail::read_some_offset_handler<ReadHandler>(handler, offset_));
}
private:
// The platform-specific implementation.
service_impl_type service_impl_;
boost::uint64_t offset_;
};
boost::asio::io_service::id offset_stream_handle_service::id;
#ifndef ADAPT_IO_SERVICE
typedef basic_adapted_stream<
boost::asio::windows::random_access_handle> adapted_stream;
#else
typedef boost::asio::windows::basic_stream_handle<
offset_stream_handle_service> adapted_stream;
#endif
class AsyncReader
{
public:
AsyncReader(boost::asio::io_service& io_service, HANDLE handle)
: io_service_(io_service),
input_buffer(/*size*/ 8192),
input_handle(io_service, handle)
{
start_read();
}
void start_read()
{
boost::asio::async_read_until(input_handle, input_buffer, '\n',
boost::bind(&AsyncReader::handle_read, this,
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred));
}
void handle_read(const boost::system::error_code& error, std::size_t length);
// void handle_write(const boost::system::error_code& error);
private:
boost::asio::io_service& io_service_;
boost::asio::streambuf input_buffer;
adapted_stream input_handle;
};
void AsyncReader::handle_read(const boost::system::error_code& error, std::size_t length)
{
if (!error)
{
static int count = 0;
++count;
// method 1: (same problem)
// const char* pStart = boost::asio::buffer_cast<const char*>(input_buffer.data());
// std::string s(pStart, length);
// input_buffer.consume(length);
// method 2:
std::istream is(&input_buffer);
std::string s;
assert(std::getline(is, s));
std::cout << "line #" << count << ", length " << length << ", getline() [" << s.size() << "] '" << s << "'\n";
start_read();
}
else if (error == boost::asio::error::not_found)
std::cerr << "Did not receive ending character!\n";
else
std::cerr << "Misc error during read!\n";
}
int _tmain(int argc, _TCHAR* argv[])
{
boost::asio::io_service io_service;
HANDLE handle = ::CreateFile(TEXT("c:/temp/input.txt"),
GENERIC_READ,
0, // share mode
NULL, // security attribute: NULL = default
OPEN_EXISTING, // creation disposition
FILE_FLAG_OVERLAPPED,
NULL // template file
);
AsyncReader obj(io_service, handle);
io_service.run();
std::cout << "Normal termination\n";
getchar();
return 0;
}
Что дает следующий результат при использовании ввода исходного вопроса:
line #1, length 70, getline() [69] 'LINE 1 abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789' line #2, length 70, getline() [69] 'LINE 2 abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789' line #3, length 70, getline() [69] 'LINE 3 abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789' line #4, length 70, getline() [69] 'LINE 4 abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789' line #5, length 70, getline() [69] 'LINE 5 abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789' line #6, length 70, getline() [69] 'LINE 6 abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789' line #7, length 70, getline() [69] 'LINE 7 abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789' line #8, length 70, getline() [69] 'LINE 8 abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789' line #9, length 70, getline() [69] 'LINE 9 abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789' line #10, length 70, getline() [69] 'LINE 0 abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789' line #11, length 70, getline() [69] 'LINE A abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789' line #12, length 70, getline() [69] 'LINE B abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789' line #13, length 70, getline() [69] 'LINE C abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789' line #14, length 70, getline() [69] 'LINE D abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789' line #15, length 70, getline() [69] 'LINE E abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789' Misc error during read! Normal termination
У моего входного файла не было символа \n
в конце LINE F. Таким образом, AsyncReader::handle_read()
вызывается с ошибкой boost::asio::error::eof
и input_buffer
содержимого, содержащего LINE F. После изменения окончательного else case для печати дополнительной информации:
...
else
{
std::cerr << "Error: " << error.message() << "\n";
if (std::size_t buffer_size = input_buffer.size())
{
boost::asio::streambuf::const_buffers_type bufs = input_buffer.data();
std::string contents(boost::asio::buffers_begin(bufs),
boost::asio::buffers_begin(bufs) + buffer_size);
std::cerr << "stream contents: '" << contents << "'\n";
}
}
Я получаю следующий вывод:
line #1, length 70, getline() [69] 'LINE 1 abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789' line #2, length 70, getline() [69] 'LINE 2 abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789' line #3, length 70, getline() [69] 'LINE 3 abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789' line #4, length 70, getline() [69] 'LINE 4 abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789' line #5, length 70, getline() [69] 'LINE 5 abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789' line #6, length 70, getline() [69] 'LINE 6 abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789' line #7, length 70, getline() [69] 'LINE 7 abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789' line #8, length 70, getline() [69] 'LINE 8 abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789' line #9, length 70, getline() [69] 'LINE 9 abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789' line #10, length 70, getline() [69] 'LINE 0 abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789' line #11, length 70, getline() [69] 'LINE A abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789' line #12, length 70, getline() [69] 'LINE B abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789' line #13, length 70, getline() [69] 'LINE C abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789' line #14, length 70, getline() [69] 'LINE D abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789' line #15, length 70, getline() [69] 'LINE E abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789' Error: End of file stream contents: 'LINE F abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789' Normal termination