Boost: реализация пула потоков asio для периодически синхронизированных задач
У меня есть "основная" функция, выполняющая множество небольших независимых задач каждый раз за каждый шаг времени. Однако после каждого шага времени я должен дождаться завершения всех задач перед тем, как сделать шаг вперед.
Я хочу сделать программу многопоточной. Я пробовал реализации с потоком потока boost-offshoot, и я попытался использовать вектор потоков (разделяемых указателей на), и я пробовал идеи asio threadpool (используя io_service, устанавливая некоторую работу, а затем распространяя запуск на потоки и проводки обработчиков в io_service).
Все из них, похоже, имеют множество накладных задач, создающих и уничтожающих потоки для моих "многих небольших задач", и я хочу, чтобы, предпочтительно, используя инструменты asio, создать экземпляр одного io_service, одного потока_группы, обработчиков проводки для io_service, и ожидание завершения одного рабочего шага, прежде чем отправлять больше заданий. Есть ли хороший способ сделать это? Здесь (урезанный) код для того, что я сейчас работаю:
boost::asio::io_service io_service;
for(int theTime = 0; theTime != totalTime; ++theTime)
{
io_service.reset();
boost::thread_group threads;
// scoping to destroy the work object after work is finished being assigned
{
boost::asio::io_service::work work(io_service);
for (int i = 0; i < maxNumThreads; ++i)
{
threads.create_thread(boost::bind(&boost::asio::io_service::run,
&io_service));
}
for(int i = 0; i < numSmallTasks; ++i)
{
io_service.post(boost::bind(&process_data, i, theTime));
}
}
threads.join_all();
}
Вот что у меня было (но не знаю, как реализовать):
boost::asio::io_service io_service;
boost::thread_group threads;
boost::asio::io_service::work work(io_service);
for (int i = 0; i < maxNumThreads; ++i)
{
threads.create_thread(boost::bind(&boost::asio::io_service::run,
&io_service));
}
for(int theTime = 0; theTime != totalTime; ++theTime)
{
for(int i = 0; i < numSmallTasks; ++i)
{
io_service.post(boost::bind(&process_data, i, theTime));
}
// wait here until all of these tasks are finished before looping
// **** how do I do this? *****
}
// destroy work later and join all threads later...
Ответы
Ответ 1
Вы можете использовать futures для обработки данных и синхронизировать их с помощью boost::wait_for_all()
. Это позволит вам работать с точки зрения выполняемых частей, а не потоков.
int process_data() {...}
// Pending futures
std::vector<boost::unique_future<int>> pending_data;
for(int i = 0; i < numSmallTasks; ++i)
{
// Create task and corresponding future
// Using shared ptr and binding operator() trick because
// packaged_task is non-copyable, but asio::io_service::post requires argument to be copyable
// Boost 1.51 syntax
// For Boost 1.53+ or C++11 std::packaged_task shall be boost::packaged_task<int()>
typedef boost::packaged_task<int> task_t;
boost::shared_ptr<task_t> task = boost::make_shared<task_t>(
boost::bind(&process_data, i, theTime));
boost::unique_future<int> fut = task->get_future();
pending_data.push_back(std::move(fut));
io_service.post(boost::bind(&task_t::operator(), task));
}
// After loop - wait until all futures are evaluated
boost::wait_for_all(pending_data.begin(), pending_data.end());
Ответ 2
Возможно, вы можете использовать boost:: барьер следующим образом:
void thread_proc( boost::barrier& b ) {
while( true ) {
if( !ioservice.run_one() ) break; // io_service stopped
b.wait();
}
}
Ответ 3
Метод Rost по существу работает, но boost:: make_shared не может компилироваться как есть. Ниже приведена рабочая версия (vs2012):
#include <boost/asio.hpp>
#include <boost/bind.hpp>
#include <boost/make_shared.hpp>
#include <boost/function_types/result_type.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/function.hpp>
#include <boost/thread.hpp>
std::vector<boost::unique_future<void>> pending_data;
typedef boost::packaged_task<void> task_t;
boost::shared_ptr< boost::packaged_task<void> > pt(new boost::packaged_task<void> ([&,i](){...}));
boost::unique_future<void> result = pt->get_future();
pending_data.push_back(boost::move(result));
io_service.post(boost::bind(&task_t::operator(), pt));
boost::wait_for_all(pending_data.begin(), pending_data.end());
pending_data.clear();
Он не будет компилироваться, если аргумент use в параметре packaged_task typedef. Этот пул потоков по asio и будущему методу сохранял только 8% времени по сравнению с каждым циклом, создавая новые методы потоков.