Собирать, только если запрос возвращает меньше строк n_max

Иногда при подключении к моей базе данных Oracle через ROracle и dbplyr я запускаю операцию dplyr::collect, которая извлекает больше данных, чем ожидалось, и может обрабатывать R.

Это может привести к сбою R и часто является признаком, который я должен был отфильтровать или агрегировать данные перед извлечением.

Было бы здорово проверить размер результата, прежде чем выбирать его или нет (без выполнения запроса дважды).

Назовите collect2 вариант collect, который позволит это:

ожидаемое поведение:

small_t <- con %>% tbl("small_table") %>%
  filter_group_etc %>%
  collect2(n_max = 5e6) # works fine

big_t   <- con %>% tbl("big_table")   %>%
  filter_group_etc %>%
  collect2(n_max = 5e6) # Error: query returned 15.486.245 rows, n_max set to 5.000.000

Возможно ли это?

Я также открыт для решения с помощью ROracle/DBI без dplyr, например:

dbGetQuery2(con, my_big_sql_query,n_max = 5e6) # Error: query returned 15.486.245 rows, n_max set to 5.000.000

EDIT:

См. ниже частичное решение, отправленное как ответ, но не оптимальное, потому что какое-то время тратится на извлечение данных, для которых я не нужен.

Ответы

Ответ 1

Фактически вы можете достичь своей цели в одном SQL-запросе:

Добавьте число строк (n) в качестве дополнительного столбца к данным, используя dplyr mutate, а не суммируйте, а затем установите n < n_limit как условие фильтра. Это условие соответствует условию наличия в SQL. Если количество строк больше, чем список, то данные не собираются. В противном случае собираются все данные. Возможно, вы захотите в конце столбца счетчика строк.

Этот подход должен работать на большинстве баз данных. Я проверил это, используя PostgreSQL и Oracle.

copy_to(dest=con, cars, "cars")
df <- tbl(con, "cars")
n_limit <- 51
df %>% mutate(n=n()) %>% filter(n < n_limit) %>% collect

Однако он не работает на SQLite. Чтобы понять, почему это так, вы можете проверить инструкцию SQL, сгенерированную кодом dplyr:

df %>% mutate(n=n()) %>% filter(n < n_limit) %>% show_query

<SQL>
SELECT *
FROM (SELECT "speed", "dist", COUNT(*) OVER () AS "n"
FROM "cars") "rdipjouqeu"
WHERE ("n" < 51.0)

SQL содержит функцию окна (count(*) over ()), которая не поддерживается SQLite.

Ответ 2

Это не обойти проблему, о которой вы упоминаете в комментариях о расходовании ресурсов, чтобы получить запрос дважды, но он, похоже, работает (по крайней мере, против моей базы данных MySQL - у меня нет базы данных Oracle для проверьте его):

collect2  <- function(query, limit = 20000) {

  query_nrows  <- query %>% 
    ungroup() %>% 
    summarize(n = n()) %>% 
    collect() %>% 
    pull('n')


  if(query_nrows <= limit) {
    collect(query)
  } else {
    warning("Query has ", query_nrows,"; limit is ", limit,". Data will not be collected.")
  }

}

Я не вижу возможности проверить количество строк в результатах запроса без фактического выполнения запроса. Тем не менее, с помощью этого метода вы всегда вынуждаете вычисление номеров строк в базе данных сначала и отказываетесь собирать, если вам больше 20 000 (или независимо от вашего ограничения по строкам).

Ответ 3

Я отправлю частичный ответ, проблема с этим решением заключается в том, что он будет извлекать первые строки n_max, когда запрос возвращает больше, чем n_max rows. Извлечение требует времени с моей конфигурацией, поэтому я предпочел бы избежать этого шага.

С другой стороны, он вернет ошибку, которую я запросил, и запрос не нужно отправлять дважды, когда n_rows < n_max.

dbplyr:::collect.tbl_sql недокументирован, но имеет параметры n и warn_incomplete, которые, похоже, будут делать то, что мы хотим. К сожалению, функция не работает (возможно, почему она недокументирована и не экспортируется), но мы сможем немного ее использовать.

# formals(dbplyr:::collect.tbl_sql)
# $x
# 
#
# $...
# 
# 
# $n
# [1] Inf
# 
# $warn_incomplete
# [1] TRUE

Моя личная проблема связана с базой данных Oracle, однако ее проще реплицировать с помощью базы данных SQLite, и я считаю, что решение будет работать с любой СУБД, совместимой с DBI

Инициировать и создавать поддельные данные

Это создаст файл с именем Test.sqlite в вашем рабочем каталоге.

library(dplyr)
library(dbplyr)
library(RSQLite)
library(DBI)
set.seed(1)
big_iris <- sample_n(iris,50000,replace=TRUE)
con <- DBI::dbConnect(RSQLite::SQLite(), dbname="Test.sqlite")
DBI::dbWriteTable(con,"BIG_IRIS",rename_all(big_iris,. %>% toupper %>% sub("\\.","_",.)),overwrite=T)
rm(big_iris)

Кровавое отверстие

Предположим, что мой R может обрабатывать только 20 000 строк, требующих отмены

big_iris_filtered <- con %>% tbl("BIG_IRIS") %>% filter(SEPAL_LENGTH > 5.2) %>% 
  collect
nrow(big_iris_filtered) # [1] 35041 <- that too much
big_iris_filtered <- con %>% tbl("BIG_IRIS") %>% filter(SEPAL_LENGTH > 5.2) %>% 
  collect(n=2e4,warn_incomplete=TRUE)
nrow(big_iris_filtered) # [1] 20000

Он останавливается на уровне 20 000, что является началом, но параметр warn_incomplete, похоже, ничего не вызывает.

Это имеет смысл, когда мы смотрим на код для collect:

dbplyr:::collect.tbl_sql
# function (x, ..., n = Inf, warn_incomplete = TRUE) 
# {
#   assert_that(length(n) == 1, n > 0L)
#   if (n == Inf) {
#     n <- -1
#   }
#   else {
#     x <- head(x, n)
#   }
#   sql <- db_sql_render(x$src$con, x)
#   out <- db_collect(x$src$con, sql, n = n, warn_incomplete = warn_incomplete)
#   grouped_df(out, intersect(op_grps(x), names(out)))
# } 

head здесь вызывается метод dbplyr:::head.tbl_lazy, который добавит компонент LIMIT в код SQLite (и, вероятно, сделает его аварийным с Oracle, поскольку head не был установлен по умолчанию в последний раз я проверено). Вы можете отлаживать и проверять значение sql, чтобы увидеть его.

Добавляя этот компонент LIMIT, мы все извлекаем по определению, поэтому warn_incomplete не может вызывать предупреждение по строке (которую он должен выполнять через dbplyr:::res_warn_incomplete)

Так что просто удалите часть:

collect2 <- function (x, ..., n = Inf, warn_incomplete = TRUE) {
  assertthat::assert_that(length(n) == 1, n > 0L)
  sql <- dbplyr:::db_sql_render(x$src$con, x)
  out <- dbplyr:::db_collect(x$src$con, sql, n = n, warn_incomplete = warn_incomplete)
  grouped_df(out, intersect(dbplyr:::op_grps(x), names(out)))
}

И тест:

big_iris_filtered <- con %>% tbl("BIG_IRIS") %>% filter(SEPAL_LENGTH > 5.2) %>% 
  collect2(n=2e4,warn_incomplete=FALSE)
# no warning as expected
big_iris_filtered <- con %>% tbl("BIG_IRIS") %>% filter(SEPAL_LENGTH > 5.2) %>% 
  collect2(n=2e4,warn_incomplete=TRUE)
# Warning message:
# Only first 20,000 results retrieved. Use n = Inf to retrieve all.
dim(big_iris_filtered)
[1] 20000     5

Это работает!

Мне нужно еще одно редактирование, потому что я хотел, чтобы он остановился, а не предупреждать, см. ниже.

Решение

collect2 <- function (x, ..., n = Inf, warn_incomplete = TRUE) {
  assertthat::assert_that(length(n) == 1, n > 0L)
  sql <- dbplyr:::db_sql_render(x$src$con, x)
  out <- withCallingHandlers(dbplyr:::db_collect(x$src$con, sql, n = n, warn_incomplete = warn_incomplete),warning=function(w) {stop(w)})
  grouped_df(out, intersect(dbplyr:::op_grps(x), names(out)))
}

big_iris_filtered <- con %>% tbl("BIG_IRIS") %>% filter(SEPAL_LENGTH > 5.2) %>% 
  collect2(n=2e4,warn_incomplete=TRUE)
# Error: Only first 20,000 results retrieved. Use n = Inf to retrieve all.

Хорошо, так что мое частичное решение. Частично, потому что я извлекал эти строки перед остановкой, поэтому запрос выполнялся на сервере И был доставлен клиенту до n-й строки, а затем вызвал ошибку. С моей конфигурацией выборка происходит медленно, и я бы предпочел бы избежать выборки для ничего.

следующая задача

Следующей задачей было бы остановить ее до того, как ее принесли. Код из db_collect показывает, как это происходит в настоящий момент, проблема заключается в том, что результирующий набор res обновляется dbFetch, который одновременно выполняет запрос и выборки в R.

dbplyr:::db_collect.DBIConnection
# function (con, sql, n = -1, warn_incomplete = TRUE, ...) 
# {
#   res <- dbSendQuery(con, sql)
#   tryCatch({
#     out <- dbFetch(res, n = n)
#     if (warn_incomplete) {
#       res_warn_incomplete(res, "n = Inf")
#     }
#   }, finally = {
#     dbClearResult(res)
#   })
#   out
# }