Ответ 1
среда выполнения С++ должна отслеживать количество созданных потоков и правильно настроить std:: async
Нет. Если асинхронные задачи фактически выполняются асинхронно (а не отложено), то все, что требуется, это то, что они запускаются, как если бы они были в новом потоке. Это совершенно верно для нового потока, который будет создан и запущен для каждой задачи, без какой-либо ограниченной аппаратной возможности для parallelism.
Есть примечание:
[Примечание. Если эта политика указана вместе с другими политиками, например, при использовании значения политики запуска:: async | запуск:: отложено, реализации должны отложить вызов или выбрать политику когда не более concurrency можно эффективно использовать. -end note]
Однако это ненормативно, и в любом случае это указывает на то, что когда-либо более concurrency может быть использовано, задачи могут откладываться и, следовательно, выполняться, когда кто-то ждет результата, а не остается асинхронным и запущенным сразу после завершения одной из предыдущих асинхронных задач, что было бы желательно для максимального parallelism.
То есть, если у нас есть 10 длительных задач, и реализация может выполнять только 4 параллельно, то первые 4 будут асинхронными, а затем последние 6 могут быть отложены. Ожидание будущих фьючерсов будет выполнять отложенные задачи по одному потоку в последовательности, исключая параллельное выполнение для этих задач.
В примечании также говорится, что вместо отложенного вызова выбор политики может быть отложен. То есть функция все еще может выполняться асинхронно, но это решение может быть отложено, скажем, до тех пор, пока одна из ранних задач не завершится, освободив ядро для новой задачи. Но опять же, это не требуется, нота является ненормативной, и насколько я знаю, реализация Microsoft является единственной, которая ведет себя таким образом. Когда я посмотрел на другую реализацию libС++, она просто полностью игнорирует эту заметку, так что использование политик std::launch::async
или std::launch::any
приводит к асинхронному выполнению нового потока.
(Я считаю, что в случае Microsoft их библиотека ConcRT несет ответственность за это?)
Реализация Microsoft действительно ведет себя так, как вы описываете, но это не требуется, и переносная программа не может полагаться на это поведение.
Один из способов портативного ограничения того, сколько потоков фактически выполняется, - использовать что-то вроде семафора:
#include <future>
#include <mutex>
#include <cstdio>
// a semaphore class
//
// All threads can wait on this object. When a waiting thread
// is woken up, it does its work and then notifies another waiting thread.
// In this way only n threads will be be doing work at any time.
//
class Semaphore {
private:
std::mutex m;
std::condition_variable cv;
unsigned int count;
public:
Semaphore(int n) : count(n) {}
void notify() {
std::unique_lock<std::mutex> l(m);
++count;
cv.notify_one();
}
void wait() {
std::unique_lock<std::mutex> l(m);
cv.wait(l, [this]{ return count!=0; });
--count;
}
};
// an RAII class to handle waiting and notifying the next thread
// Work is done between when the object is created and destroyed
class Semaphore_waiter_notifier {
Semaphore &s;
public:
Semaphore_waiter_notifier(Semaphore &s) : s{s} { s.wait(); }
~Semaphore_waiter_notifier() { s.notify(); }
};
// some inefficient work for our threads to do
int fib(int n) {
if (n<2) return n;
return fib(n-1) + fib(n-2);
}
// for_each algorithm for iterating over a container but also
// making an integer index available.
//
// f is called like f(index, element)
template<typename Container, typename F>
F for_each(Container &c, F f) {
Container::size_type i = 0;
for (auto &e : c)
f(i++, e);
return f;
}
// global semaphore so that lambdas don't have to capture it
Semaphore thread_limiter(4);
int main() {
std::vector<int> input(100);
for_each(input, [](int i, int &e) { e = (i%10) + 35; });
std::vector<std::future<int>> output;
for_each(input, [&output](int i, int e) {
output.push_back(std::async(std::launch::async, [] (int task, int n) -> int {
Semaphore_waiter_notifier w(thread_limiter);
std::printf("Starting task %d\n", task);
int res = fib(n);
std::printf("\t\t\t\t\t\tTask %d finished\n", task);
return res;
}, i, e));
});
for_each(output, [](int i, std::future<int> &e) {
std::printf("\t\t\tWaiting on task %d\n", i);
int res = e.get();
std::printf("\t\t\t\t\t\t\t\t\tTask %d result: %d\n", i, res);
});
}