Scala & Spark: переработка SQL-операторов
Я потратил довольно много времени, чтобы закодировать несколько SQL-запросов, которые ранее использовались для извлечения данных для различных сценариев R
. Вот как это работает
sqlContent = readSQLFile("file1.sql")
sqlContent = setSQLVariables(sqlContent, variables)
results = executeSQL(sqlContent)
Ключ в том, что для некоторых запросов требуется результат из предыдущего запроса - почему создание VIEW
в самой базе данных не решает эту проблему. С помощью Spark 2.0
я уже выяснил способ сделать это через
// create a dataframe using a jdbc connection to the database
val tableDf = spark.read.jdbc(...)
var tempTableName = "TEMP_TABLE" + java.util.UUID.randomUUID.toString.replace("-", "").toUpperCase
var sqlQuery = Source.fromURL(getClass.getResource("/sql/" + sqlFileName)).mkString
sqlQuery = setSQLVariables(sqlQuery, sqlVariables)
sqlQuery = sqlQuery.replace("OLD_TABLE_NAME",tempTableName)
tableDf.createOrReplaceTempView(tempTableName)
var data = spark.sql(sqlQuery)
Но это очень смутное мнение. Кроме того, более сложные запросы, например. запросы, которые incooporate подзапроса факторинга в настоящее время не работают. Есть ли более надежный способ повторного внедрения кода SQL в код Spark.SQL
с помощью filter($"")
, .select($"")
и т.д.
Общая цель состоит в том, чтобы получить несколько org.apache.spark.sql.DataFrame
s, каждый из которых представляет результаты одного предыдущего SQL-запроса (который всегда несколько JOIN
s, WITH
s и т.д.). Итак, n
запросы, ведущие к n
DataFrame
s.
Есть ли лучший вариант, чем предоставленные два?
Настройка: Hadoop v.2.7.3
, Spark 2.0.0
, Intelli J IDEA 2016.2
, Scala 2.11.8
, Testcluster на рабочей станции Win7
Ответы
Ответ 1
Не особенно ясно, каково ваше требование, но я думаю, вы говорите, что у вас есть что-то вроде:
SELECT * FROM people LEFT OUTER JOIN places ON ...
SELECT * FROM (SELECT * FROM people LEFT OUTER JOIN places ON ...) WHERE age>20
и вы хотели бы объявить и выполнить это эффективно, как
SELECT * FROM people LEFT OUTER JOIN places ON ...
SELECT * FROM <cachedresult> WHERE age>20
Чтобы добиться этого, я бы увеличил входной файл, поэтому каждый оператор sql имеет связанное имя таблицы, в которое будет сохранен результат.
например.
PEOPLEPLACES\tSELECT * FROM people LEFT OUTER JOIN places ON ...
ADULTS=SELECT * FROM PEOPLEPLACES WHERE age>18
Затем выполните в цикле, например
parseSqlFile().foreach({case (name, query) => {
val data: DataFrame = execute(query)
data.createOrReplaceTempView(name)
}
Убедитесь, что вы запросили запросы, чтобы все необходимые таблицы были созданы. Другие делают немного больше разбора и сортировки по зависимостям.
В RDMS я бы назвал эти таблицы Materialized Views. то есть преобразование на другие данные, например представление, но с результатом, кэшированным для последующего повторного использования.