Могу ли я использовать std:: async, не дожидаясь будущего ограничения?
Высокий уровень
Я хочу вызвать некоторые функции без возвращаемого значения в асинхронном режиме, не дожидаясь их завершения. Если я использую std:: async, будущий объект не разрушает, пока задача не закончится, это сделает вызов несинхронизирующим в моем случае.
Пример
void sendMail(const std::string& address, const std::string& message)
{
//sending the e-mail which takes some time...
}
myResonseType processRequest(args...)
{
//Do some processing and valuate the address and the message...
//Sending the e-mail async
auto f = std::async(std::launch::async, sendMail, address, message);
//returning the response ASAP to the client
return myResponseType;
} //<-- I'm stuck here until the async call finish to allow f to be destructed.
// gaining no benefit from the async call.
Мои вопросы
- Есть ли способ преодолеть это ограничение?
- if (1) is no, следует ли реализовать один раз поток, который будет принимать эти фьючерсы "зомби" и ждать на них?
- Является ли (1) и (2) нет, есть ли другой вариант, тогда просто создайте собственный пул потоков?
Примечание:
Я предпочитаю не использовать опцию thread + detach (предложенную @galop1n), так как при создании нового потока есть накладные расходы, которых я хочу избежать. При использовании std:: async (по крайней мере, на MSVC) используется пул внутренних потоков.
Спасибо.
Ответы
Ответ 1
Вы можете перенести будущее в глобальный объект, поэтому при запуске локального будущего деструктора ему не нужно ждать завершения асинхронного потока.
std::vector<std::future<void>> pending_futures;
myResonseType processRequest(args...)
{
//Do some processing and valuate the address and the message...
//Sending the e-mail async
auto f = std::async(std::launch::async, sendMail, address, message);
// transfer the future shared state to a longer-lived future
pending_futures.push_back(std::move(f));
//returning the response ASAP to the client
return myResponseType;
}
N.B. Это небезопасно, если асинхронный поток ссылается на любые локальные переменные в функции processRequest
.
При использовании std::async
(по крайней мере, на MSVC) используется внутренний пул потоков.
В действительности это не соответствует требованиям, стандарт явно указывает, что задачи, выполняемые с помощью std::launch::async
, должны выполняться как в новом потоке, поэтому любые локальные переменные нитей не должны сохраняться из одной задачи в другую. Обычно это не имеет значения.
Ответ 2
почему вы не просто начинаете нить и отсоединяетесь, если не хотите присоединиться?
std::thread{ sendMail, address, message}.detach();
std:: async привязан к времени жизни std:: future, который он возвращает, и их нет альтернативы этому.
Включение std:: future в очереди ожидания, прочитанной другим потоком, потребует того же механизма безопасности, что и пул, получающий новую задачу, например мьютекс вокруг контейнера.
Таким образом, ваш лучший вариант - это пул потоков, чтобы использовать задачи, непосредственно вложенные в поточную безопасную очередь. И это не будет зависеть от конкретной реализации.
Ниже реализации пула потоков, принимающих любые вызываемые аргументы и аргументы, потоки выполняют сеанс в очереди, лучшая реализация должна использовать переменные условия (coliru):
#include <iostream>
#include <queue>
#include <memory>
#include <thread>
#include <mutex>
#include <functional>
#include <string>
struct ThreadPool {
struct Task {
virtual void Run() const = 0;
virtual ~Task() {};
};
template < typename task_, typename... args_ >
struct RealTask : public Task {
RealTask( task_&& task, args_&&... args ) : fun_( std::bind( std::forward<task_>(task), std::forward<args_>(args)... ) ) {}
void Run() const override {
fun_();
}
private:
decltype( std::bind(std::declval<task_>(), std::declval<args_>()... ) ) fun_;
};
template < typename task_, typename... args_ >
void AddTask( task_&& task, args_&&... args ) {
auto lock = std::unique_lock<std::mutex>{mtx_};
using FinalTask = RealTask<task_, args_... >;
q_.push( std::unique_ptr<Task>( new FinalTask( std::forward<task_>(task), std::forward<args_>(args)... ) ) );
}
ThreadPool() {
for( auto & t : pool_ )
t = std::thread( [=] {
while ( true ) {
std::unique_ptr<Task> task;
{
auto lock = std::unique_lock<std::mutex>{mtx_};
if ( q_.empty() && stop_ )
break;
if ( q_.empty() )
continue;
task = std::move(q_.front());
q_.pop();
}
if (task)
task->Run();
}
} );
}
~ThreadPool() {
{
auto lock = std::unique_lock<std::mutex>{mtx_};
stop_ = true;
}
for( auto & t : pool_ )
t.join();
}
private:
std::queue<std::unique_ptr<Task>> q_;
std::thread pool_[8];
std::mutex mtx_;
volatile bool stop_ {};
};
void foo( int a, int b ) {
std::cout << a << "." << b;
}
void bar( std::string const & s) {
std::cout << s;
}
int main() {
ThreadPool pool;
for( int i{}; i!=42; ++i ) {
pool.AddTask( foo, 3, 14 );
pool.AddTask( bar, " - " );
}
}
Ответ 3
Вместо того, чтобы переместить будущее в глобальный объект (и вручную управлять удалением неиспользуемых фьючерсов), вы можете перенести его в локальную область асинхронной функции.
"Пусть асинхронная функция принимает свое собственное будущее", так сказать.
Я придумал эту оболочку шаблона, которая работает для меня (протестирована в Windows):
#include <future>
template<class Function, class... Args>
void async_wrapper(Function&& f, Args&&... args, std::future<void>& future,
std::future<void>&& is_valid, std::promise<void>&& is_moved) {
is_valid.wait(); // Wait until the return value of std::async is written to "future"
auto our_future = std::move(future); // Move "future" to a local variable
is_moved.set_value(); // Only now we can leave void_async in the main thread
// This is also used by std::async so that member function pointers work transparently
auto functor = std::bind(f, std::forward<Args>(args)...);
functor();
}
template<class Function, class... Args> // This is what you call instead of std::async
void void_async(Function&& f, Args&&... args) {
std::future<void> future; // This is for std::async return value
// This is for our synchronization of moving "future" between threads
std::promise<void> valid;
std::promise<void> is_moved;
auto valid_future = valid.get_future();
auto moved_future = is_moved.get_future();
// Here we pass "future" as a reference, so that async_wrapper
// can later work with std::async return value
future = std::async(
async_wrapper<Function, Args...>,
std::forward<Function>(f), std::forward<Args>(args)...,
std::ref(future), std::move(valid_future), std::move(is_moved)
);
valid.set_value(); // Unblock async_wrapper waiting for "future" to become valid
moved_future.wait(); // Wait for "future" to actually be moved
}
Я немного удивлен, что это работает, потому что я думал, что перемещенный будущий деструктор будет блокироваться, пока мы не покинем async_wrapper. Он должен ждать возвращения async_wrapper, но он ждет внутри этой самой функции. Логично, что это должен быть тупик, но это не так.
Я также попытался добавить строку в конец async_wrapper, чтобы вручную удалить будущий объект:
our_future = std::future<void>();
Это также не блокирует.