Data.table и параллельные вычисления

После этого сообщения: multicore и data.table в R, мне было интересно, есть ли способ использовать все ядра при использовании data.table, обычно делая вычисления по группам можно распараллелить. Кажется, что plyr допускает такие операции по дизайну.

Ответы

Ответ 1

Первое, что нужно проверить, - это то, что data.table FAQ 3.1 точка 2 погрузилась в:

Одно распределение памяти выполняется только для самой большой группы, тогда память используется для других групп. Очень мало мусора собирать.

Это одна из причин, по которой группировка данных. Но этот подход не поддается распараллеливанию. Параллелизация означает копирование данных на другие потоки, вместо этого, расчетное время. Но я понимаю, что группировка data.table обычно быстрее, чем plyr с .parallel. Это зависит от времени вычисления задачи для каждой группы, и если это время вычисления можно легко уменьшить или нет. Часто перемещается информация (при контроле 1 или 3 прогона больших задач с данными).

Чаще всего до сих пор на самом деле было получено несколько запросов, которые кусают выражение j [.data.table. Например, в последнее время мы увидели слабую производительность в группе data.table, но виновник оказался min(POSIXct) (Агрегация в R более чем 80K уникальных идентификаторов). Избегая того, что gotcha дал более 50-кратное ускорение.

Итак, мантра: Rprof, Rprof, Rprof.

Кроме того, пункт 1 из тех же часто задаваемых вопросов может быть значительным:

Только этот столбец сгруппирован, остальные 19 игнорируются, потому что data.table проверяет выражение j и понимает, что не использует другие столбцы.

Итак, data.table действительно не соответствует парадигме split-apply-comb вообще. Он работает по-другому. split-apply-comb предоставляет возможность распараллеливания, но на самом деле не масштабируется до больших данных.

Также см. сноску 3 в инфо-виньетке data.table:

Мы задаемся вопросом, сколько людей используют параллельные методы для кодирования это векторное сканирование

То, что пытается сказать "уверен, что параллель значительно быстрее, но как долго это нужно делать с эффективным алгоритмом?".

НО, если вы профилировали (используя Rprof), и задача для каждой группы действительно интенсивна в вычислении, то 3 сообщения о datatable-help, включая слово "multicore", могут помочь:

многоядерные сообщения о datatable-help

Конечно, есть много задач, в которых распараллеливание было бы неплохо в data.table, и есть способ сделать это. Но это еще не сделано, так как обычно другие факторы укуса, поэтому он был низким приоритетом. Если вы можете опубликовать воспроизводимые фиктивные данные с помощью тестов и результатов Rprof, это поможет увеличить приоритет.

Ответ 2

Я провел несколько тестов на @matt dowle до мантры Rprof, Rprof, Rprof.

Я нахожу, что решение распараллеливаться зависит от контекста; но, вероятно, является значительным. В зависимости от тестовых операций (например, foo ниже, которые могут быть настроены) и количества используемых сердечников (я стараюсь как 8, так и 24), я получаю разные результаты.

Ниже результатов:

  • используя 8 ядер, я вижу улучшение 21% в этом примере для распараллеливания
  • используя 24 ядра, я вижу 14% улучшение.

Я также рассматриваю некоторые реальные данные (операции с несовместимыми данными), которые показывают более крупные (33% или 25%, два разных теста) улучшение паралеллинга с 24 ядрами

R> sessionInfo() # 24 core machine:
R version 3.3.2 (2016-10-31)
Platform: x86_64-pc-linux-gnu (64-bit)
Running under: CentOS Linux 7 (Core)

attached base packages:
[1] parallel  stats     graphics  grDevices utils     datasets  methods
[8] base

other attached packages:
[1] microbenchmark_1.4-2.1 stringi_1.1.2          data.table_1.10.4

R> sessionInfo() # 8 core machine:
R version 3.3.2 (2016-10-31)
Platform: x86_64-apple-darwin13.4.0 (64-bit)
Running under: macOS Sierra 10.12.4

attached base packages:
[1] parallel  stats     graphics  grDevices utils     datasets  methods   base     

other attached packages:
[1] microbenchmark_1.4-2.1 stringi_1.1.5          data.table_1.10.4     

Пример ниже:

library(data.table)
library(stringi)
library(microbenchmark)

set.seed(7623452L)
my_grps <- stringi::stri_rand_strings(n= 5000, length= 10)

my_mat <- matrix(rnorm(1e5), ncol= 20)
dt <- data.table(grps= rep(my_grps, each= 20), my_mat)

foo <- function(dt) {
  dt2 <- dt ## needed for .SD lock
  nr <- nrow(dt2)

  idx <- sample.int(nr, 1, replace=FALSE)

  dt2[idx,][, `:=` (
    new_var1= V1 / V2,
    new_var2= V4 * V3 / V10,
    new_var3= sum(V12),
    new_var4= ifelse(V10 > 0, V11 / V13, 1),
    new_var5= ifelse(V9 < 0, V8 / V18, 1)
  )]


  return(dt2[idx,])
}

split_df <- function(d, var) {
  base::split(d, get(var, as.environment(d)))
}

foo2 <- function(dt) {
  dt2 <- split_df(dt, "grps")

  require(parallel)
  cl <- parallel::makeCluster(min(nrow(dt), parallel::detectCores()))
  clusterExport(cl, varlist= "foo")
  clusterExport(cl, varlist= "dt2", envir = environment())
  clusterEvalQ(cl, library("data.table"))

  dt2 <- parallel::parLapply(cl, X= dt2, fun= foo)

  parallel::stopCluster(cl)
  return(rbindlist(dt2))
}

print(parallel::detectCores()) # 8

microbenchmark(
  serial= dt[,foo(.SD), by= "grps"],
  parallel= foo2(dt),
  times= 10L
)

Unit: seconds
     expr      min       lq     mean   median       uq      max neval cld
   serial 6.962188 7.312666 8.433159 8.758493 9.287294 9.605387    10   b
 parallel 6.563674 6.648749 6.976669 6.937556 7.102689 7.654257    10  a 

print(parallel::detectCores()) # 24

Unit: seconds
     expr       min        lq     mean   median       uq      max neval cld
   serial  9.014247  9.804112 12.17843 13.17508 13.56914 14.13133    10   a
 parallel 10.732106 10.957608 11.17652 11.06654 11.30386 12.28353    10   a

Профилирование:

Мы можем использовать этот ответ, чтобы предоставить более прямой ответ на исходный комментарий @matt для dowle для профилирования.

В результате мы видим, что большинство времени вычисления обрабатывается base, а не data.table. data.table сами операции, как и ожидалось, являются исключительно быстрыми. Хотя некоторые могут утверждать, что это свидетельствует о том, что нет необходимости в parallelism внутри data.table, я полагаю, что этот рабочий процесс/набор операций не является нетипичным. То есть, это мое сильное подозрение, что большая часть большой агрегации data.table включает значительное количество кода не data.table; и что это соотносится с интерактивным использованием и развитием/производством. Поэтому я заключаю, что parallelism будет ценным в data.table для больших агрегаций.

library(profr)

prof_list <- replicate(100, profr::profr(dt[,foo(.SD), by= "grps"], interval = 0.002),
                       simplify = FALSE)

pkg_timing <- fun_timing <- vector("list", length= 100)
for (i in 1:100) {
  fun_timing[[i]] <- tapply(prof_list[[i]]$time, paste(prof_list[[i]]$source, prof_list[[i]]$f, sep= "::"), sum)
  pkg_timing[[i]] <- tapply(prof_list[[i]]$time, prof_list[[i]]$source, sum)
}

sort(sapply(fun_timing, sum)) #  no large outliers

fun_timing2 <- rbindlist(lapply(fun_timing, function(x) {
  ret <- data.table(fun= names(x), time= x)
  ret[, pct_time := time / sum(time)]
  return(ret)
}))

pkg_timing2 <- rbindlist(lapply(pkg_timing, function(x) {
  ret <- data.table(pkg= names(x), time= x)
  ret[, pct_time := time / sum(time)]
  return(ret)
}))

fun_timing2[, .(total_time= sum(time),
                avg_time= mean(time),
                avg_pct= round(mean(pct_time), 4)), by= "fun"][
  order(avg_time, decreasing = TRUE),][1:10,]

pkg_timing2[, .(total_time= sum(time),
                avg_time= mean(time),
                avg_pct= round(mean(pct_time), 4)), by= "pkg"][
  order(avg_time, decreasing = TRUE),]

Результаты:

                      fun total_time avg_time avg_pct
 1:               base::[    670.362  6.70362  0.2694
 2:      NA::[.data.table    667.350  6.67350  0.2682
 3:       .GlobalEnv::foo    335.784  3.35784  0.1349
 4:              base::[[    163.044  1.63044  0.0655
 5:   base::[[.data.frame    133.790  1.33790  0.0537
 6:            base::%in%    120.512  1.20512  0.0484
 7:        base::sys.call     86.846  0.86846  0.0348
 8: NA::replace_dot_alias     27.824  0.27824  0.0112
 9:           base::which     23.536  0.23536  0.0095
10:          base::sapply     22.080  0.22080  0.0089

          pkg total_time avg_time avg_pct
1:       base   1397.770 13.97770  0.7938
2: .GlobalEnv    335.784  3.35784  0.1908
3: data.table     27.262  0.27262  0.0155

crossposted в github/data.table