Data.table и параллельные вычисления
После этого сообщения: multicore и data.table в R, мне было интересно, есть ли способ использовать все ядра при использовании data.table, обычно делая вычисления по группам можно распараллелить. Кажется, что plyr
допускает такие операции по дизайну.
Ответы
Ответ 1
Первое, что нужно проверить, - это то, что data.table
FAQ 3.1 точка 2 погрузилась в:
Одно распределение памяти выполняется только для самой большой группы, тогда память используется для других групп. Очень мало мусора собирать.
Это одна из причин, по которой группировка данных. Но этот подход не поддается распараллеливанию. Параллелизация означает копирование данных на другие потоки, вместо этого, расчетное время. Но я понимаю, что группировка data.table
обычно быстрее, чем plyr
с .parallel
. Это зависит от времени вычисления задачи для каждой группы, и если это время вычисления можно легко уменьшить или нет. Часто перемещается информация (при контроле 1 или 3 прогона больших задач с данными).
Чаще всего до сих пор на самом деле было получено несколько запросов, которые кусают выражение j
[.data.table
. Например, в последнее время мы увидели слабую производительность в группе data.table
, но виновник оказался min(POSIXct)
(Агрегация в R более чем 80K уникальных идентификаторов). Избегая того, что gotcha дал более 50-кратное ускорение.
Итак, мантра: Rprof
, Rprof
, Rprof
.
Кроме того, пункт 1 из тех же часто задаваемых вопросов может быть значительным:
Только этот столбец сгруппирован, остальные 19 игнорируются, потому что data.table проверяет выражение j и понимает, что не использует другие столбцы.
Итак, data.table
действительно не соответствует парадигме split-apply-comb вообще. Он работает по-другому. split-apply-comb предоставляет возможность распараллеливания, но на самом деле не масштабируется до больших данных.
Также см. сноску 3 в инфо-виньетке data.table:
Мы задаемся вопросом, сколько людей используют параллельные методы для кодирования это векторное сканирование
То, что пытается сказать "уверен, что параллель значительно быстрее, но как долго это нужно делать с эффективным алгоритмом?".
НО, если вы профилировали (используя Rprof
), и задача для каждой группы действительно интенсивна в вычислении, то 3 сообщения о datatable-help, включая слово "multicore", могут помочь:
многоядерные сообщения о datatable-help
Конечно, есть много задач, в которых распараллеливание было бы неплохо в data.table, и есть способ сделать это. Но это еще не сделано, так как обычно другие факторы укуса, поэтому он был низким приоритетом. Если вы можете опубликовать воспроизводимые фиктивные данные с помощью тестов и результатов Rprof, это поможет увеличить приоритет.
Ответ 2
Я провел несколько тестов на @matt dowle до мантры Rprof, Rprof, Rprof.
Я нахожу, что решение распараллеливаться зависит от контекста; но, вероятно, является значительным. В зависимости от тестовых операций (например, foo
ниже, которые могут быть настроены) и количества используемых сердечников (я стараюсь как 8, так и 24), я получаю разные результаты.
Ниже результатов:
- используя 8 ядер, я вижу улучшение 21% в этом примере для распараллеливания
- используя 24 ядра, я вижу 14% улучшение.
Я также рассматриваю некоторые реальные данные (операции с несовместимыми данными), которые показывают более крупные (33%
или 25%
, два разных теста) улучшение паралеллинга с 24 ядрами
R> sessionInfo() # 24 core machine:
R version 3.3.2 (2016-10-31)
Platform: x86_64-pc-linux-gnu (64-bit)
Running under: CentOS Linux 7 (Core)
attached base packages:
[1] parallel stats graphics grDevices utils datasets methods
[8] base
other attached packages:
[1] microbenchmark_1.4-2.1 stringi_1.1.2 data.table_1.10.4
R> sessionInfo() # 8 core machine:
R version 3.3.2 (2016-10-31)
Platform: x86_64-apple-darwin13.4.0 (64-bit)
Running under: macOS Sierra 10.12.4
attached base packages:
[1] parallel stats graphics grDevices utils datasets methods base
other attached packages:
[1] microbenchmark_1.4-2.1 stringi_1.1.5 data.table_1.10.4
Пример ниже:
library(data.table)
library(stringi)
library(microbenchmark)
set.seed(7623452L)
my_grps <- stringi::stri_rand_strings(n= 5000, length= 10)
my_mat <- matrix(rnorm(1e5), ncol= 20)
dt <- data.table(grps= rep(my_grps, each= 20), my_mat)
foo <- function(dt) {
dt2 <- dt ## needed for .SD lock
nr <- nrow(dt2)
idx <- sample.int(nr, 1, replace=FALSE)
dt2[idx,][, `:=` (
new_var1= V1 / V2,
new_var2= V4 * V3 / V10,
new_var3= sum(V12),
new_var4= ifelse(V10 > 0, V11 / V13, 1),
new_var5= ifelse(V9 < 0, V8 / V18, 1)
)]
return(dt2[idx,])
}
split_df <- function(d, var) {
base::split(d, get(var, as.environment(d)))
}
foo2 <- function(dt) {
dt2 <- split_df(dt, "grps")
require(parallel)
cl <- parallel::makeCluster(min(nrow(dt), parallel::detectCores()))
clusterExport(cl, varlist= "foo")
clusterExport(cl, varlist= "dt2", envir = environment())
clusterEvalQ(cl, library("data.table"))
dt2 <- parallel::parLapply(cl, X= dt2, fun= foo)
parallel::stopCluster(cl)
return(rbindlist(dt2))
}
print(parallel::detectCores()) # 8
microbenchmark(
serial= dt[,foo(.SD), by= "grps"],
parallel= foo2(dt),
times= 10L
)
Unit: seconds
expr min lq mean median uq max neval cld
serial 6.962188 7.312666 8.433159 8.758493 9.287294 9.605387 10 b
parallel 6.563674 6.648749 6.976669 6.937556 7.102689 7.654257 10 a
print(parallel::detectCores()) # 24
Unit: seconds
expr min lq mean median uq max neval cld
serial 9.014247 9.804112 12.17843 13.17508 13.56914 14.13133 10 a
parallel 10.732106 10.957608 11.17652 11.06654 11.30386 12.28353 10 a
Профилирование:
Мы можем использовать этот ответ, чтобы предоставить более прямой ответ на исходный комментарий @matt для dowle для профилирования.
В результате мы видим, что большинство времени вычисления обрабатывается base
, а не data.table
. data.table
сами операции, как и ожидалось, являются исключительно быстрыми. Хотя некоторые могут утверждать, что это свидетельствует о том, что нет необходимости в parallelism внутри data.table
, я полагаю, что этот рабочий процесс/набор операций не является нетипичным. То есть, это мое сильное подозрение, что большая часть большой агрегации data.table
включает значительное количество кода не data.table
; и что это соотносится с интерактивным использованием и развитием/производством. Поэтому я заключаю, что parallelism будет ценным в data.table
для больших агрегаций.
library(profr)
prof_list <- replicate(100, profr::profr(dt[,foo(.SD), by= "grps"], interval = 0.002),
simplify = FALSE)
pkg_timing <- fun_timing <- vector("list", length= 100)
for (i in 1:100) {
fun_timing[[i]] <- tapply(prof_list[[i]]$time, paste(prof_list[[i]]$source, prof_list[[i]]$f, sep= "::"), sum)
pkg_timing[[i]] <- tapply(prof_list[[i]]$time, prof_list[[i]]$source, sum)
}
sort(sapply(fun_timing, sum)) # no large outliers
fun_timing2 <- rbindlist(lapply(fun_timing, function(x) {
ret <- data.table(fun= names(x), time= x)
ret[, pct_time := time / sum(time)]
return(ret)
}))
pkg_timing2 <- rbindlist(lapply(pkg_timing, function(x) {
ret <- data.table(pkg= names(x), time= x)
ret[, pct_time := time / sum(time)]
return(ret)
}))
fun_timing2[, .(total_time= sum(time),
avg_time= mean(time),
avg_pct= round(mean(pct_time), 4)), by= "fun"][
order(avg_time, decreasing = TRUE),][1:10,]
pkg_timing2[, .(total_time= sum(time),
avg_time= mean(time),
avg_pct= round(mean(pct_time), 4)), by= "pkg"][
order(avg_time, decreasing = TRUE),]
Результаты:
fun total_time avg_time avg_pct
1: base::[ 670.362 6.70362 0.2694
2: NA::[.data.table 667.350 6.67350 0.2682
3: .GlobalEnv::foo 335.784 3.35784 0.1349
4: base::[[ 163.044 1.63044 0.0655
5: base::[[.data.frame 133.790 1.33790 0.0537
6: base::%in% 120.512 1.20512 0.0484
7: base::sys.call 86.846 0.86846 0.0348
8: NA::replace_dot_alias 27.824 0.27824 0.0112
9: base::which 23.536 0.23536 0.0095
10: base::sapply 22.080 0.22080 0.0089
pkg total_time avg_time avg_pct
1: base 1397.770 13.97770 0.7938
2: .GlobalEnv 335.784 3.35784 0.1908
3: data.table 27.262 0.27262 0.0155
crossposted в github/data.table