Сложить/уменьшить по списку фьючерсов с ассоциативно-коммутативным оператором
Рассмотрим следующее:
import scala.concurrent._
import scala.concurrent.duration.Duration.Inf
import scala.concurrent.ExecutionContext.Implicits.global
def slowInt(i: Int) = { Thread.sleep(200); i }
def slowAdd(x: Int, y: Int) = { Thread.sleep(100); x + y }
def futures = (1 to 20).map(i => future(slowInt(i)))
def timeFuture(fn: => Future[_]) = {
val t0 = System.currentTimeMillis
Await.result(fn, Inf)
println((System.currentTimeMillis - t0) / 1000.0 + "s")
}
обе следующие печати ~ 2.5s:
// Use Future.reduce directly (Future.traverse is no different)
timeFuture { Future.reduce(futures)(slowAdd) }
// First wait for all results to come in, convert to Future[List], and then map the List[Int]
timeFuture { Future.sequence(futures).map(_.reduce(slowAdd)) }
Насколько я понимаю, причиной этого является то, что Future.reduce/traverse
является общим и, следовательно, не работает быстрее с ассоциативным оператором, однако есть ли простой способ определить вычисление, в котором начнется свертывание/восстановление как только будет доступно хотя бы 2 значения (или 1 в случае fold
), так что, пока некоторые элементы в списке все еще сгенерированы, уже сгенерированные уже вычисляются на?
Ответы
Ответ 1
Scalaz имеет реализацию фьючерсов, которая включает chooseAny
, который берет коллекцию фьючерсов и возвращает будущее кортежа первого завершенного элемента и остальной части фьючерса:
def chooseAny[A](h: Future[A], t: Seq[Future[A]]): Future[(A, Seq[Future[A]])]
Twitter реализации фьючерсов называет это select
. Стандартная библиотека не включает его (но см. Viktor Klang реализация, о которой говорил Сом Снытт выше). Я буду использовать версию Scalaz здесь, но перевод должен быть простым.
Один из подходов к выполнению операций, по вашему желанию, состоит в том, чтобы вытащить два завершенных элемента из списка, направить будущее их суммы обратно в список и перезаписать (см. this gist для полного рабочего примера):
def collapse[A](fs: Seq[Future[A]])(implicit M: Monoid[A]): Future[A] =
Nondeterminism[Future].chooseAny(fs).fold(Future.now(M.zero))(
_.flatMap {
case (hv, tf) =>
Nondeterminism[Future].chooseAny(tf).fold(Future.now(hv))(
_.flatMap {
case (hv2, tf2) => collapse(Future(hv |+| hv2) +: tf2)
}
)
}
)
В вашем случае вы вызываете что-то вроде этого:
timeFuture(
collapse(futures)(
Monoid.instance[Int]((a, b) => slowAdd(a, b), 0)
)
)
Это работает всего лишь на 1,6 секунды на моем двухъядерном ноутбуке, поэтому он работает как ожидается (и будет продолжать делать то, что вы хотите, даже если время, затраченное на slowInt
, меняется).
Ответ 2
Чтобы получить похожие тайминги, мне пришлось использовать локальный ExecutionContext (здесь):
implicit val ec = ExecutionContext.fromExecutor(Executors.newCachedThreadPool())
После этого я получил лучшую производительность, разделив список и запустив работу над каждым списком, назначив их в vals (на основе запоминания того, что фьючерсы в for-comprehenion обрабатываются по порядку, если они не назначены vals перед для-comprehenion). Из-за ассоциативной природы списков я мог бы повторно объединить их с еще одним вызовом той же функции. Я изменил функцию timeFuture
, чтобы взять описание и распечатать результат добавления:
def timeFuture(desc: String, fn: => Future[_]) = {
val t0 = System.currentTimeMillis
val res = Await.result(fn, Inf)
println(desc + " = " + res + " in " + (System.currentTimeMillis - t0) / 1000.0 + "s")
}
Я новичок в Scala, поэтому я все еще работаю над повторным использованием той же функции на последнем этапе (я думаю, это должно быть возможно), поэтому я обманул и создал вспомогательную функцию:
def futureSlowAdd(x: Int, y: Int) = future(slowAdd(x, y))
Тогда я мог бы сделать следующее:
timeFuture( "reduce", { Future.reduce(futures)(slowAdd) } )
val right = Future.reduce(futures.take(10))(slowAdd)
val left = Future.reduce(futures.takeRight(10))(slowAdd)
timeFuture( "split futures", (right zip left) flatMap (futureSlowAdd _).tupled)
С этим последним zip и т.д. здесь.
Я думаю, что это параллелизирует работу и рекомбинирует результаты. Когда я запускаю те, я получаю:
reduce = 210 in 2.111s
split futures = 210 in 1.201s
Я использовал жестко закодированную пару взяток, но моя идея состоит в том, что полное расщепление списков может быть введено в функцию и фактически повторно использовать ассоциативную функцию, переданную как в правую, так и в левую ветки (с разрешенными слегка несбалансированными деревьями из-за остатков) в конце.
Когда я рандомизирую функции slowInt()
и slowAdd()
, такие как:
def rand(): Int = Random.nextInt(3)+1
def slowInt(i: Int) = { Thread.sleep(rand()*100); i }
def slowAdd(x: Int, y: Int) = { Thread.sleep(rand()*100); x + y }
Я все еще вижу, что "раскол фьючерсов" заканчивается раньше, чем "уменьшить". Кажется, есть некоторые накладные расходы для запуска, что влияет на первый вызов timeFuture
. Вот несколько примеров их запуска со стартовым штрафом за "раздельные фьючерсы":
split futures = 210 in 2.299s
reduce = 210 in 4.7s
split futures = 210 in 2.594s
reduce = 210 in 3.5s
split futures = 210 in 2.399s
reduce = 210 in 4.401s
На более быстром компьютере, чем мой ноутбук, и используя тот же ExecutionContext в вопросе, я не вижу таких больших различий (без рандомизации в медленных * функциях):
split futures = 210 in 2.196s
reduce = 210 in 2.5s
Здесь, похоже, "раздельные фьючерсы" ведут лишь немного.
Один последний раз. Здесь функция (ака мерзости), которая расширяет идею, которую я имел выше:
def splitList[A <: Any]( f: List[Future[A]], assocFn: (A, A) => A): Future[A] = {
def applyAssocFn( x: Future[A], y: Future[A]): Future[A] = {
(x zip y) flatMap( { case (a,b) => future(assocFn(a, b)) } )
}
def divideAndConquer( right: List[Future[A]], left: List[Future[A]]): Future[A] = {
(right, left) match {
case(r::Nil, Nil) => r
case(Nil, l::Nil) => l
case(r::Nil, l::Nil) => applyAssocFn( r, l )
case(r::Nil, l::ls) => {
val (l_right, l_left) = ls.splitAt(ls.size/2)
val lret = applyAssocFn( l, divideAndConquer( l_right, l_left ) )
applyAssocFn( r, lret )
}
case(r::rs, l::Nil) => {
val (r_right, r_left) = rs.splitAt(rs.size/2)
val rret = applyAssocFn( r, divideAndConquer( r_right, r_left ) )
applyAssocFn( rret, l )
}
case (r::rs, l::ls) => {
val (r_right, r_left) = rs.splitAt(rs.size/2)
val (l_right, l_left) = ls.splitAt(ls.size/2)
val tails = applyAssocFn(divideAndConquer( r_right, r_left ), divideAndConquer( l_right, l_left ))
val heads = applyAssocFn(r, l)
applyAssocFn( heads, tails )
}
}
}
val( right, left ) = f.splitAt(f.size/2)
divideAndConquer( right, left )
}
Для разбиения списка вверх по хвосту рекурсивно требуется все, что угодно: Scala и быстро присваивать фьючерсы значениям (для их запуска).
Когда я тестирую его так:
timeFuture( "splitList", splitList( futures.toList, slowAdd) )
Я получаю следующие тайминги на своем ноутбуке с помощью newCachedThreadPool()
:
splitList = 210 in 0.805s
split futures = 210 in 1.202s
reduce = 210 in 2.105s
Я заметил, что тайминги "split futures" могут быть недействительными, поскольку фьючерсы запускаются за пределами блока timeFutures
. Однако функцию splitList
следует корректно вызывать внутри функции timeFutures
. Для меня важна важность выбора ExecutionContext, который лучше всего подходит для аппаратного обеспечения.
Ответ 3
Ответ ниже будет выполняться в течение 700 мс на 20-ядерном компьютере, который дает то, что нужно выполнить в последовательности, а также можно делать на любой машине с любой реализацией (20 параллельных вызовов 200 мс slowInt
, за которыми следуют 5 вложенных 100 мс slowAdd
). Он работает в 1600 мс на моем 4-ядерном компьютере, который также можно делать на этой машине.
Когда вызовы slowAdd
расширяются, f
представляет slowAdd
:
f(f(f(f(f(x1, x2), f(x3, x4)), f(f(x5, x6), f(x7, x8))), f(f(f(x9, x10), f(x11, x12)), f(f(x13, x14), f(x15, x16)))), f(f(x17, x18), f(x19, x20)))
Пример, который вы указали при использовании Future.sequence
, будет выполняться в 2100 мс на 20-ядерном компьютере (20 параллельных вызовов 200 мс slowInt
, за которыми следуют 19 вложенных вызовов 100 мс slowAdd
). Он работает в 2900 мс на моем 4-ядерном компьютере.
Когда вызовы slowAdd
расширяются, f
представляет slowAdd
:
f(f(f(f(f(f(f(f(f(f(f(f(f(f(f(f(f(f(f(x1, x2), x3), x4), x5), x6), x7), x8), x9), x10), x11), x12), x13), x14), x15) x16) x17) x18) x19) x20)
Метод Future.reduce
вызывает Future.sequence(futures).map(_ reduceLeft op)
, поэтому два приведенных вами примера эквивалентны.
В моем ответе используется функция combine
, которая берет список фьючерсов и op
, функцию, которая объединяет два фьючерса в один как параметры. Функция возвращает op
применительно ко всем парам фьючерсов и парам пар и так далее до тех пор, пока не останется одно будущее:
def combine[T](list: List[Future[T]], op: (Future[T], Future[T]) => Future[T]): Future[T] =
if (list.size == 1) list.head
else if(list.size == 2) list.reduce(op)
else list.grouped(2).map(combine(_, op)).reduce(op)
Примечание. Я немного изменил код в соответствии с моими предпочтениями стиля.
def slowInt(i: Int): Future[Int] = Future { Thread.sleep(200); i }
def slowAdd(fx: Future[Int], fy: Future[Int]): Future[Int] = fx.flatMap(x => fy.map { y => Thread.sleep(100); x + y })
var futures: List[Future[Int]] = List.range(1, 21).map(slowInt)
В приведенном ниже коде используется функция combine
для вашего случая:
timeFuture(combine(futures, slowAdd))
Код ниже обновляет ваш пример Future.sequence
для моих модификаций:
timeFuture(Future.sequence(futures).map(_.reduce{(x, y) => Thread.sleep(100); x + y }))