Ответ 1
Короче говоря, вам нужно обернуть предоставленную пользователем задачу другой функцией, которая будет:
- Вызов функции пользователя или вызываемого объекта.
- Блокировка мьютекса и уменьшение счетчика.
Возможно, я не понимаю всех требований к этому пулу потоков. Таким образом, для ясности здесь приведен явный список того, что, по моему мнению, является следующим:
- Пул управляет временем жизни потоков. Пользователь не должен удалять потоки, которые находятся в пуле.
- Пользователь может назначить задачу пулу неинтрузивным способом.
- Когда задача назначается, если все потоки в пуле в настоящее время запускают другие задачи, задача отбрасывается.
Прежде чем я расскажу о реализации, есть несколько ключевых моментов, которые я хотел бы подчеркнуть:
- Как только поток запущен, он будет работать до завершения, отмены или завершения. Функция, выполняемая нитью, не может быть переназначена. Чтобы позволить одному потоку выполнять несколько функций в течение своей жизни, поток будет запускаться с функцией, которая будет считываться из очереди, например
io_service::run()
, и вызываемые типы отправляются в очередь событий, например как отio_service::post()
. -
io_service::run()
возвращает, если вio_service
нет ожидающей работы,io_service
останавливается или исключение вызывается из обработчика, в котором работает поток. Чтобы предотвратить возвратio_serivce::run()
при отсутствии незавершенной работы, можно использовать классio_service::work
. - Определение требований типа задачи (т.е. тип задачи должен быть вызван синтаксисом
object()
) вместо того, чтобы требовать тип (то есть задача должна наследовать отprocess
), обеспечивает большую гибкость для пользователя. Он позволяет пользователю задавать задачу как указатель функции или тип, предоставляющий нулевое значениеoperator()
.
Реализация с использованием boost::asio
:
#include <boost/asio.hpp>
#include <boost/thread.hpp>
class thread_pool
{
private:
boost::asio::io_service io_service_;
boost::asio::io_service::work work_;
boost::thread_group threads_;
std::size_t available_;
boost::mutex mutex_;
public:
/// @brief Constructor.
thread_pool( std::size_t pool_size )
: work_( io_service_ ),
available_( pool_size )
{
for ( std::size_t i = 0; i < pool_size; ++i )
{
threads_.create_thread( boost::bind( &boost::asio::io_service::run,
&io_service_ ) );
}
}
/// @brief Destructor.
~thread_pool()
{
// Force all threads to return from io_service::run().
io_service_.stop();
// Suppress all exceptions.
try
{
threads_.join_all();
}
catch ( const std::exception& ) {}
}
/// @brief Adds a task to the thread pool if a thread is currently available.
template < typename Task >
void run_task( Task task )
{
boost::unique_lock< boost::mutex > lock( mutex_ );
// If no threads are available, then return.
if ( 0 == available_ ) return;
// Decrement count, indicating thread is no longer available.
--available_;
// Post a wrapped task into the queue.
io_service_.post( boost::bind( &thread_pool::wrap_task, this,
boost::function< void() >( task ) ) );
}
private:
/// @brief Wrap a task so that the available count can be increased once
/// the user provided task has completed.
void wrap_task( boost::function< void() > task )
{
// Run the user supplied task.
try
{
task();
}
// Suppress all exceptions.
catch ( const std::exception& ) {}
// Task has finished, so increment count of available threads.
boost::unique_lock< boost::mutex > lock( mutex_ );
++available_;
}
};
Несколько комментариев о реализации:
- Обработка исключений должна выполняться вокруг пользовательской задачи. Если пользовательская функция или вызываемый объект генерирует исключение, которое не относится к типу
boost::thread_interrupted
, вызываетсяstd::terminate()
. Это результат Boost.Thread исключения в функциях потоков. Также стоит прочитать Boost.Asio эффект исключений, отбрасываемых обработчиками. - Если пользователь предоставляет
task
черезboost::bind
, тогда вложенныйboost::bind
не сможет скомпилироваться. Требуется один из следующих вариантов:- Не поддерживается
task
, созданныйboost::bind
. - Мета-программирование для выполнения ветвления разбора во время компиляции на основе того, будет ли пользователь вводить, если результат
boost::bind
, чтобыboost::protect
мог использоваться, посколькуboost::protect
работает только на определенных объектах функций. - Используйте другой тип, чтобы косвенно передать объект
task
. Я решил использоватьboost::function
для удобства чтения за счет потери точного типа.boost::tuple
, хотя и немного менее читаемый, также можно использовать для сохранения точного типа, как показано в примере Boost.Asio serialization.
- Не поддерживается
Код приложения теперь может использовать тип thread_pool
неинтрузивно:
void work() {};
struct worker
{
void operator()() {};
};
void more_work( int ) {};
int main()
{
thread_pool pool( 2 );
pool.run_task( work ); // Function pointer.
pool.run_task( worker() ); // Callable object.
pool.run_task( boost::bind( more_work, 5 ) ); // Callable object.
}
thread_pool
может быть создан без Boost.Asio и может быть немного проще для сопровождающих, поскольку им больше не нужно знать о поведениях Boost.Asio
, например, когда возвращается io_service::run()
, а что означает io_service::work
объект:
#include <queue>
#include <boost/bind.hpp>
#include <boost/thread.hpp>
class thread_pool
{
private:
std::queue< boost::function< void() > > tasks_;
boost::thread_group threads_;
std::size_t available_;
boost::mutex mutex_;
boost::condition_variable condition_;
bool running_;
public:
/// @brief Constructor.
thread_pool( std::size_t pool_size )
: available_( pool_size ),
running_( true )
{
for ( std::size_t i = 0; i < pool_size; ++i )
{
threads_.create_thread( boost::bind( &thread_pool::pool_main, this ) ) ;
}
}
/// @brief Destructor.
~thread_pool()
{
// Set running flag to false then notify all threads.
{
boost::unique_lock< boost::mutex > lock( mutex_ );
running_ = false;
condition_.notify_all();
}
try
{
threads_.join_all();
}
// Suppress all exceptions.
catch ( const std::exception& ) {}
}
/// @brief Add task to the thread pool if a thread is currently available.
template < typename Task >
void run_task( Task task )
{
boost::unique_lock< boost::mutex > lock( mutex_ );
// If no threads are available, then return.
if ( 0 == available_ ) return;
// Decrement count, indicating thread is no longer available.
--available_;
// Set task and signal condition variable so that a worker thread will
// wake up andl use the task.
tasks_.push( boost::function< void() >( task ) );
condition_.notify_one();
}
private:
/// @brief Entry point for pool threads.
void pool_main()
{
while( running_ )
{
// Wait on condition variable while the task is empty and the pool is
// still running.
boost::unique_lock< boost::mutex > lock( mutex_ );
while ( tasks_.empty() && running_ )
{
condition_.wait( lock );
}
// If pool is no longer running, break out.
if ( !running_ ) break;
// Copy task locally and remove from the queue. This is done within
// its own scope so that the task object is destructed immediately
// after running the task. This is useful in the event that the
// function contains shared_ptr arguments bound via bind.
{
boost::function< void() > task = tasks_.front();
tasks_.pop();
lock.unlock();
// Run the task.
try
{
task();
}
// Suppress all exceptions.
catch ( const std::exception& ) {}
}
// Task has finished, so increment count of available threads.
lock.lock();
++available_;
} // while running_
}
};