Выполнить цикл foreach параллельно или последовательно задавать условие
Я часто получаю несколько вложенных циклов foreach
, а иногда при написании общих функций (например, для пакета) нет уровня, который очевидно для параллелизации. Есть ли способ выполнить то, что описывает макет ниже?
foreach(i = 1:I) %if(I < J) `do` else `dopar`% {
foreach(j = 1:J) %if(I >= J) `do` else `dopar`% {
# Do stuff
}
}
Кроме того, есть ли способ определить, зарегистрирован ли параллельный бэкэнд, чтобы я мог избежать ненужных предупреждающих сообщений? Это было бы полезно как при проверке пакетов до подачи CRAN, так и для того, чтобы не беспокоить пользователей, работающих R на одноядерных компьютерах.
foreach(i=1:I) %if(is.parallel.backend.registered()) `dopar` else `do`% {
# Do stuff
}
Спасибо за ваше время.
Изменить: Большое спасибо за отзывы всех разработчиков и разработчиков, и вы правы в том, что лучший способ справиться с приведенным выше примером - переосмыслить всю установку. Я бы предпочел что-то вроде ниже, чтобы идея triu
, но по сути это одна и та же точка. И это, конечно же, можно было бы сделать с помощью параллельного tapply
, например, предложенного Джорисом.
ij <- expand.grid(i=1:I, j=1:J)
foreach(i=ij$I, j=ij$J) %dopar% {
myFuction(i, j)
}
Однако, пытаясь упростить ситуацию, вызвавшую эту тему, я не рассмотрел некоторые важные детали. Представьте, что у меня есть две функции analyse
и batch.analyse
, и лучший уровень для распараллеливания может быть различным в зависимости от значений n.replicates
и n.time.points
.
analyse <- function(x, y, n.replicates=1000){
foreach(r = 1:n.replicates) %do% {
# Do stuff with x and y
}
}
batch.analyse <- function(x, y, n.replicates=10, n.time.points=1000){
foreach(tp = 1:time.points) %do% {
my.y <- my.func(y, tp)
analyse(x, my.y, n.replicates)
}
}
Если n.time.points > n.replicates
имеет смысл распараллеливаться в batch.analyse
, но в противном случае имеет смысл распараллеливаться в analyse
. Любые идеи о том, как справиться с этим? Как-то можно было бы обнаружить в analyse
, если распараллеливание уже произошло?
Ответы
Ответ 1
Проблема, которую вы поднимаете, была мотивацией для оператора nesting foreach,%:%. Если тело внутреннего цикла занимает значительное количество времени, вы довольно безопасно используете:
foreach(i = 1:I) %:%
foreach(j = 1:J) %dopar% {
# Do stuff
}
Это "разворачивает" вложенные циклы, в результате чего выполняются задачи (I * J), которые могут выполняться параллельно.
Если тело внутреннего цикла не занимает много времени, решение сложнее. Стандартное решение состоит в том, чтобы распараллелить внешний цикл, но это все равно может привести к множеству небольших задач (когда я большой, а J мало) или несколько больших задач (когда я мал, а J велико).
Мое любимое решение - использовать оператор вложенности с разделением задач. Вот полный пример использования бэкэнда doMPI:
library(doMPI)
cl <- startMPIcluster()
registerDoMPI(cl)
I <- 100; J <- 2
opt <- list(chunkSize=10)
foreach(i = 1:I, .combine='cbind', .options.mpi=opt) %:%
foreach(j = 1:J, .combine='c') %dopar% {
(i * j)
}
closeCluster(cl)
Это приводит к 20 "задачам", каждый из которых состоит из 10 вычислений тела цикла. Если вы хотите иметь единый кусок задачи для каждого рабочего, вы можете вычислить размер блока как:
cs <- ceiling((I * J) / getDoParWorkers())
opt <- list(chunkSize=cs)
К сожалению, не все параллельные серверы поддерживают разбиение задач. Кроме того, doMPI не поддерживает Windows.
Для получения дополнительной информации по этой теме, см. мою виньетку "Вложенные петли петли" в пакете foreach:
library(foreach)
vignette('nesting')
Ответ 2
Если вы закончите несколько вложенных циклов foreach, я переосмыслил бы мой подход. Использование параллельных версий tapply
может решить многие проблемы. В общем, вы не должны использовать вложенную распараллеливание, поскольку это ничего не приносит вам. Параллелизируйте внешний цикл и забудьте о внутреннем цикле.
Причина проста: если у вас есть 3 соединения в вашем кластере, внешний цикл допара будет использовать все три. Внутренняя петля допара не сможет использовать какие-либо дополнительные соединения, поскольку их нет. Таким образом, вы ничего не получаете. Следовательно, макет, который вы даете, не имеет никакого смысла с точки зрения программирования.
На ваш второй вопрос отвечает довольно легко функция getDoParRegistered()
, которая возвращает TRUE при регистрации бэкэнд и FALSE в противном случае. Обратите внимание:
- он также возвращает TRUE, если зарегистрирован последовательный бэкэнд (т.е. после вызова registerDoSEQ).
- Он вернет TRUE и после остановки кластера, но в этом случае% dopar% вернет ошибку.
например:
require(foreach)
require(doSNOW)
cl <- makeCluster(rep("localhost",2),type="SOCK")
getDoParRegistered()
[1] FALSE
registerDoSNOW(cl)
getDoParRegistered()
[1] TRUE
stopCluster(cl)
getDoParRegistered()
[1] TRUE
Но теперь выполняется этот код:
a <- matrix(1:16, 4, 4)
b <- t(a)
foreach(b=iter(b, by='col'), .combine=cbind) %dopar%
(a %*% b)
вернется с ошибкой:
Error in summary.connection(connection) : invalid connection
Вы можете создать дополнительную проверку. A (отвратительно уродливый) взлом, который вы можете использовать, чтобы проверить, что соединение, зарегистрированное doSNOW
, является допустимым, может быть:
isvalid <- function(){
if (getDoParRegistered() ){
X <- foreach:::.foreachGlobals$objs[[1]]$data
x <- try(capture.output(print(X)),silent=TRUE)
if(is(x,"try-error")) FALSE else TRUE
} else {
FALSE
}
}
Что вы можете использовать как
if(!isvalid()) registerDoSEQ()
Это будет зарегистрировать последовательный бэкэнд, если getDoParRegistered() возвращает TRUE, но действительного соединения с кластером больше нет. Но опять же, это хак, и я понятия не имею, работает ли он с другими бэкендами или даже с другими типами типов кластеров (я использую главным образом сокеты).
Ответ 3
В обратном порядке задаваемых вопросов:
-
@Joris правильно относится к проверке зарегистрированного параллельного бэкэнд. Однако обратите внимание, что существует различие между машиной, являющейся одноядерной, и независимо от того, зарегистрирован ли параллельный бэкэнд. Проверка # ядра - очень специфическая задача для платформы (операционной системы). В Linux это может сработать для вас:
CountUnixCPUs <- function(cpuinfo = "/proc/cpuinfo"){
tmpCmd <- paste("grep processor ", cpuinfo, " | wc -l", sep = "")
numCPU <- as.numeric(system(tmpCmd, intern = TRUE))
return(numCPU)
}
Изменить: см. ссылку @Joris на другую страницу ниже, которая дает советы для Windows и Linux. Я, скорее всего, переписаю свой собственный код, по крайней мере, чтобы добавить больше параметров для подсчета ядер.
-
Что касается вложенных циклов, я беру другой подход: я готовлю таблицу параметров и затем перебираю по строкам. Очень простой способ, например:
library(Matrix)
ptable <- which(triu(matrix(1, ncol = 20, nrow = 20))==1, arr.ind = TRUE)
foreach(ix_row = 1:nrow(ptable)) %dopar% { myFunction(ptable[ix_row,])}