Понимание примера о возобновляемых функциях в предложении N3650 для С++ 1y

Рассмотрим следующий пример, взятый из N3650:

int cnt = 0;
do {
   cnt = await streamR.read(512, buf);
   if (cnt == 0)
      break;
   cnt = await streamW.write(cnt, buf);
} while (cnt > 0);

Я, вероятно, что-то пропустил, но если бы я хорошо понял async и await, то в чем смысл показать полезность двух конструкций с приведенным выше примером, когда эффекты эквивалентны записи:

int cnt = 0;
do {
   cnt = streamR.read(512, buf).get();
   if (cnt == 0)
      break;
   cnt = streamW.write(cnt, buf).get();
} while (cnt > 0);

где оба вызова read().get() и write().get() синхронны?

Ответы

Ответ 1

Ключевое слово ожидания не равно вызову get on the future. Вы можете посмотреть на это более похоже, предположим, вы начинаете с этого:

future<T> complex_function()
{
     do_some_stuff();
     future<Result> x = await some_async_operation();
     return do_some_other_stuff(x);
}

Это функционально более или менее то же, что и

future<T> complex_function()
{
     do_some_stuff();
     return some_async_operation().then([=](future<Result> x) {
         return do_some_other_stuff(x);
     });
}

Обратите внимание на более или менее, потому что есть некоторые последствия для управления ресурсами, переменные, созданные в do_some_stuff, не должны копироваться для выполнения do_some_other_stuff, как это делает лямбда-версия.

Второй вариант делает более понятным, что произойдет при вызове.

  • do_some_stuff() будет вызываться синхронно при вызове complex_function.
  • some_async_operation называется асинхронно и приводит к будущему. Точный момент выполнения этой операции зависит от вашей фактической реализации асинхронного вызова, она может быть немедленной, когда вы используете потоки, это может быть всякий раз, когда вызывается .get() при использовании отложенного выполнения.
  • Мы не выполняем do_some_other_stuff немедленно, а скорее связываем его с будущим, полученным на шаге 2. Это означает, что он может быть выполнен, как только результат из some_async_operation будет готов, но не раньше. Кроме того, момент выполнения определяется временем выполнения. Если реализация просто завершает предложение then, это означает, что он наследует родительскую политику будущего исполнителя/запуска (согласно N3558).
  • Функция возвращает последнее будущее, которое представляет конечный результат. Обратите внимание, что эти ПОТРЕБНОСТИ - это будущее, так как часть тела функции выполняется асинхронно.

Ответ 2

Более полный пример (надеюсь, правильный):

future<void> forwardMsgs(istream& streamR, ostream& streamW) async
{
    char buf[512];
    int cnt = 0;
    do {
       cnt = await streamR.read(512, buf);
       if (cnt == 0)
          break;
       cnt = await streamW.write(cnt, buf);
    } while (cnt > 0);
}

future<void> fut = forwardMsgs(myStreamR, myStreamW);

/* do something */

fut.get();

Важным моментом является (цитата из черновика):

После приостановки возобновляемая функция может быть возобновлена ​​логикой планирования среды выполнения и в конечном итоге завершит ее логику, после чего она выполнит оператор return (явный или неявный) и задает значение результата функции в заполнителе.

и

Возобновляемая функция может продолжить выполнение в другом потоке после возобновления после приостановки ее выполнения.

То есть, поток, который изначально называл forwardMsgs, может возвращаться в любой точке подвески. Если это так, во время строки /* do something */ код внутри forwardMsgs может быть выполнен другим потоком, хотя функция была вызвана "синхронно".


Этот пример очень похож на

future<void> fut = std::async(forwardMsgs, myStreamR, myStreamW);

/* do something */

fut.get();

Разница заключается в том, что возобновляемая функция может выполняться разными потоками: другой поток может возобновить выполнение (возобновляемой функции) после каждой точки возобновления/приостановки.

Ответ 3

Я думаю, идея состоит в том, что вызовы streamR.read() и streamW.write() представляют собой асинхронные операции ввода-вывода и возвратные фьючерсы, которые автоматически ожидают выражения await.

Таким образом, эквивалентная синхронная версия должна была бы вызвать future::get() для получения результатов, например.

int cnt = 0;
do {
   cnt = streamR.read(512, buf).get();
   if (cnt == 0)
      break;
   cnt = streamW.write(cnt, buf).get();
} while (cnt > 0);

Вы правильно указали, что здесь нет concurrency. Однако в контексте возобновляемой функции await делает поведение отличным от приведенного выше фрагмента. Когда достигается await, функция вернет значение future, поэтому вызывающая функция может продолжать работу без блокировки, даже если возобновляемая функция блокируется при await, ожидая какого-либо другого результата (например, в этом случае read() или write() для завершения.) Функция возобновления может возобновиться асинхронно, поэтому результат становится доступным в фоновом режиме, в то время как вызывающий выполняет что-то еще.

Ответ 4

Здесь правильный перевод функции примера, чтобы не использовать:

struct Copy$StackFrame {
  promise<void> $result;
  input_stream& streamR;
  output_stream& streamW;
  int cnt;
  char buf[512];
};
using Copy$StackPtr = std::shared_ptr<Copy$StackFrame>;

future<void> Copy(input_stream& streamR, output_stream& streamW) {
  Copy$StackPtr $stack{ new Copy$StackFrame{ {}, streamR, streamW, 0 } };
  future<int> f$1 = $stack->streamR.read(512, stack->buf);
  f$1.then([$stack](future<int> f) { Copy$Cont1($stack, std::move(f)); });
  return $stack->$result.get_future();
}

void Copy$Cont1(Copy$StackPtr $stack, future<int> f$1) {
  try {
    $stack->cnt = f$1.get();
    if ($stack->cnt == 0) {
      // break;
      $stack->$result.set_value();
      return;
    }
    future<int> f$2 = $stack->streamW.write($stack->cnt, $stack->buf);
    f$2.then([$stack](future<int> f) { Copy$Cont2($stack, std::move(f)); });
  } catch (...) {
    $stack->$result.set_exception(std::current_exception());
  }
}

void Copy$Cont2(Copy$StackPtr $stack, future<int> f$2) {
  try {
    $stack->cnt = f$2.get();
    // while (cnt > 0)
    if (cnt <= 0) {
      $stack->$result.set_value();
      return;
    }
    future<int> f$1 = $stack->streamR.read(512, stack->buf);
    f$1.then([$stack](future<int> f) { Copy$Cont1($stack, std::move(f)); });
  } catch (...) {
    $stack->$result.set_exception(std::current_exception());
  }
}

Как вы можете видеть, преобразование компилятора здесь довольно сложно. Ключевым моментом здесь является то, что в отличие от версии get() исходный Copy возвращает свое будущее, как только будет выполнен первый асинхронный вызов.

Ответ 5

У меня такая же проблема со значением разницы между этими двумя образцами кода. Пусть re напишет их немного, чтобы быть более полными.

    // Having two functions
    future<void> f (istream&streamR, ostream&streamW) async
    {  int cnt = 0;
       do {
          cnt = await streamR.read(512, buf);
          if (cnt == 0)
             break;
          cnt = await streamW.write(cnt, buf);
       } while (cnt > 0);
    }
    void g(istream&streamR, ostream&streamW)
    {  int cnt = 0;
       do {
          cnt = streamR.read(512, buf).get();
          if (cnt == 0)
             break;
          cnt = streamW.write(cnt, buf).get();
       } while (cnt > 0);
    }
    // what is the difference between
    auto a = f(streamR, streamW);
    // and 
    auto b = async(g, streamR, streamW);

Вам по-прежнему нужно как минимум три стека. В обоих случаях основной поток не блокируется. Предполагается ли, что ожидание будет реализовано компилятором более эффективно, чем будущее < > : get()?. Ну, теперь без ожидания можно использовать.

Спасибо Адам Зелинский