Каков самый простой способ распараллеливать векторную функцию в R?
У меня есть очень большой список X
и векторизованная функция f
. Я хочу рассчитать f(X)
, но это займет много времени, если я сделаю это с помощью одного ядра. У меня есть доступ к 48-ядерному серверу. Каков самый простой способ распараллеливать вычисление f(X)
? Не правильный ответ:
library(foreach)
library(doMC)
registerDoMC()
foreach(x=X, .combine=c) %dopar% f(x)
Приведенный выше код действительно распараллеливает вычисление f(X)
, но он будет делать это, применяя f
отдельно к каждому элементу X
. Это игнорирует векторизованную природу f
и, вероятно, в результате будет медленнее, а не быстрее. Вместо того, чтобы применять f
elementwise к X
, я хочу разделить X
на куски разумного размера и применить к ним теги f
.
Итак, должен ли я просто разделить X
на 48 подмножеств с равным размером, а затем применить f
к каждому из них параллельно, а затем вручную собрать результат? Или есть пакет, предназначенный для этого?
В случае, если кто-то задается вопросом, мой конкретный вариант использования здесь.
Ответы
Ответ 1
Пакет itertools был разработан для решения этой проблемы. В этом случае я бы использовал isplitVector
:
n <- getDoParWorkers()
foreach(x=isplitVector(X, chunks=n), .combine='c') %dopar% f(x)
В этом примере pvec
, несомненно, быстрее и проще, но это можно использовать, например, в Windows с пакетом doParallel.
Ответ 2
Хотя это старый вопрос, это может быть интересно для всех, кто наткнулся на это через google (например, я): посмотрите на функцию pvec
в пакете multicore
. Я думаю, он делает именно то, что вы хотите.
Ответ 3
Вот моя реализация. Это функция chunkmap
, которая принимает
векторная функция, список аргументов, которые должны быть векторизованы,
и список аргументов, которые не должны быть векторизованы (т.
константы) и возвращает тот же результат, что и вызов функции на
аргументы напрямую, за исключением того, что результат вычисляется параллельно.
Для функции f
векторные аргументы v1
, v2
, v3
и скалярные
аргументы s1
, s2
, следующее должно возвращать идентичные результаты:
f(a=v1, b=v2, c=v3, d=s1, e=s2)
f(c=v3, b=v2, e=s2, a=v1, d=s1)
chunkapply(FUN=f, VECTOR.ARGS=list(a=v1, b=v2, c=v3), SCALAR.ARGS=list(d=s1, e=s2))
chunkapply(FUN=f, SCALAR.ARGS=list(e=s2, d=s1), VECTOR.ARGS=list(a=v1, c=v3, b=v2))
Так как функция chunkapply
не знает, что
аргументы f
векторизованы, а какие нет, вам решать
укажите, когда вы его вызываете, иначе вы получите неправильные результаты. Вы
обычно должны указывать ваши аргументы, чтобы убедиться, что они связаны
правильно.
library(foreach)
library(iterators)
# Use your favorite doPar backend here
library(doMC)
registerDoMC()
get.chunk.size <- function(vec.length,
min.chunk.size=NULL, max.chunk.size=NULL,
max.chunks=NULL) {
if (is.null(max.chunks)) {
max.chunks <- getDoParWorkers()
}
size <- vec.length / max.chunks
if (!is.null(max.chunk.size)) {
size <- min(size, max.chunk.size)
}
if (!is.null(min.chunk.size)) {
size <- max(size, min.chunk.size)
}
num.chunks <- ceiling(vec.length / size)
actual.size <- ceiling(vec.length / num.chunks)
return(actual.size)
}
ichunk.vectors <- function(vectors=NULL,
min.chunk.size=NULL,
max.chunk.size=NULL,
max.chunks=NULL) {
## Calculate number of chunks
recycle.length <- max(sapply(vectors, length))
actual.chunk.size <- get.chunk.size(recycle.length, min.chunk.size, max.chunk.size, max.chunks)
num.chunks <- ceiling(recycle.length / actual.chunk.size)
## Make the chunk iterator
i <- 1
it <- idiv(recycle.length, chunks=num.chunks)
nextEl <- function() {
n <- nextElem(it)
ix <- seq(i, length = n)
i <<- i + n
vchunks <- foreach(v=vectors) %do% v[1+ (ix-1) %% length(v)]
names(vchunks) <- names(vectors)
vchunks
}
obj <- list(nextElem = nextEl)
class(obj) <- c("ichunk", "abstractiter", "iter")
obj
}
chunkapply <- function(FUN, VECTOR.ARGS, SCALAR.ARGS=list(), MERGE=TRUE, ...) {
## Check that the arguments make sense
stopifnot(is.list(VECTOR.ARGS))
stopifnot(length(VECTOR.ARGS) >= 1)
stopifnot(is.list(SCALAR.ARGS))
## Choose appropriate combine function
if (MERGE) {
combine.fun <- append
} else {
combine.fun <- foreach:::defcombine
}
## Chunk and apply, and maybe merge
foreach(vchunk=ichunk.vectors(vectors=VECTOR.ARGS, ...),
.combine=combine.fun,
.options.multicore = mcoptions) %dopar%
{
do.call(FUN, args=append(vchunk, SCALAR.ARGS))
}
}
## Only do chunkapply if it will run in parallel
maybe.chunkapply <- function(FUN, VECTOR.ARGS, SCALAR.ARGS=list(), ...) {
if (getDoParWorkers() > 1) {
chunkapply(FUN, VECTOR.ARGS, SCALAR.ARGS, ...)
} else {
do.call(FUN, append(VECTOR.ARGS, SCALAR.ARGS))
}
}
Вот несколько примеров, показывающих, что chunkapply(f,list(x))
дает идентичные результаты f(x)
. Я установил max.chunk.size крайне мало, чтобы гарантировать, что алгоритм коммутации фактически используется.
> # Generate all even integers from 2 to 100 inclusive
> identical(chunkapply(function(x,y) x*y, list(1:50), list(2), max.chunk.size=10), 1:50 * 2)
[1] TRUE
> ## Sample from a standard normal distribution, then discard values greater than 1
> a <- rnorm(n=100)
> cutoff <- 1
> identical(chunkapply(function(x,limit) x[x<=limit], list(x=a), list(limit=cutoff), max.chunk.size=10), a[a<cutoff])
[1] TRUE
Если у кого-то есть лучшее имя, чем "chunkapply", предложите его.
Изменить:
Как указывает еще один ответ, в многоядерном pacakge есть функция, называемая pvec
, которая имеет очень сходную функциональность с тем, что я написал. Для простых случаев вы должны это сделать, и вы должны проголосовать за Jonas Rauch за это. Однако моя функция немного более общая, поэтому, если к вам относится какое-либо из следующих действий, вы можете вместо этого использовать мою функцию:
- Вам нужно использовать параллельный сервер, отличный от многоядерного (например, MPI). Моя функция использует foreach, поэтому вы можете использовать любую инфраструктуру распараллеливания, которая обеспечивает бэкэнд для foreach.
- Вам нужно передать несколько векторизованных аргументов.
pvec
только векторизовать по одному аргументу, поэтому вы не могли бы легко реализовать параллельное векторное добавление с помощью pvec
, например. Моя функция позволяет указать произвольные аргументы.
Ответ 4
Map-Reduce может быть тем, что вы ищете; это портировано в R
Ответ 5
Как насчет чего-то подобного? R будет использовать все доступную память, а multicore
будет распараллеливаться по всем доступным ядрам.
library(multicore)
result = mclapply(X, function,mc.preschedule=FALSE, mc.set.seed=FALSE)