Как читать вложенную коллекцию в Spark
У меня есть таблица паркета с одним из столбцов
array < struct < col1, col2,.. colN →
Может запускать запросы к этой таблице в Hive с использованием синтаксиса LATERAL VIEW.
Как прочитать эту таблицу в RDD и, что еще важнее, как фильтровать, сопоставлять и т.д. эту вложенную коллекцию в Spark?
Не удалось найти ссылки на это в документации Spark. Заранее благодарим за любую информацию!
пс. Войлок может быть полезен, чтобы дать некоторую статистику на столе.
Количество столбцов в основной таблице ~ 600. Количество строк ~ 200 м.
Количество "столбцов" в вложенной коллекции ~ 10. Среднее количество записей в вложенной коллекции ~ 35.
Ответы
Ответ 1
В случае вложенной коллекции нет магии. Spark будет работать так же, как RDD[(String, String)]
и RDD[(String, Seq[String])]
.
Чтение такой вложенной коллекции из файлов Parquet может быть сложным.
Возьмем пример из spark-shell
(1.3.1):
scala> import sqlContext.implicits._
import sqlContext.implicits._
scala> case class Inner(a: String, b: String)
defined class Inner
scala> case class Outer(key: String, inners: Seq[Inner])
defined class Outer
Напишите файл паркета:
scala> val outers = sc.parallelize(List(Outer("k1", List(Inner("a", "b")))))
outers: org.apache.spark.rdd.RDD[Outer] = ParallelCollectionRDD[0] at parallelize at <console>:25
scala> outers.toDF.saveAsParquetFile("outers.parquet")
Прочтите файл паркета:
scala> import org.apache.spark.sql.catalyst.expressions.Row
import org.apache.spark.sql.catalyst.expressions.Row
scala> val dataFrame = sqlContext.parquetFile("outers.parquet")
dataFrame: org.apache.spark.sql.DataFrame = [key: string, inners: array<struct<a:string,b:string>>]
scala> val outers = dataFrame.map { row =>
| val key = row.getString(0)
| val inners = row.getAs[Seq[Row]](1).map(r => Inner(r.getString(0), r.getString(1)))
| Outer(key, inners)
| }
outers: org.apache.spark.rdd.RDD[Outer] = MapPartitionsRDD[8] at map at DataFrame.scala:848
Важная часть row.getAs[Seq[Row]](1)
. Внутреннее представление вложенной последовательности struct
равно ArrayBuffer[Row]
, вы можете использовать любой супертип, а не Seq[Row]
. 1
- это индекс столбца во внешней строке. Я использовал метод getAs
здесь, но есть альтернативы в последних версиях Spark. См. Исходный код Значение строки.
Теперь, когда у вас есть RDD[Outer]
, вы можете применить любое желаемое преобразование или действие.
// Filter the outers
outers.filter(_.inners.nonEmpty)
// Filter the inners
outers.map(outer => outer.copy(inners = outer.inners.filter(_.a == "a")))
Обратите внимание, что мы использовали библиотеку spark-SQL только для чтения файла паркета. Вы можете, например, выбрать только нужные столбцы непосредственно в DataFrame, прежде чем сопоставлять их с RDD.
dataFrame.select('col1, 'col2).map { row => ... }
Ответ 2
Я дам ответ на основе Python с тех пор, что я использую. Я думаю, что Scala имеет что-то подобное.
Функция explode
была добавлена в Spark 1.4.0 для обработки вложенных массивов в DataFrames в соответствии с документами API Python.
Создайте тестовый фрейм данных:
from pyspark.sql import Row
df = sqlContext.createDataFrame([Row(a=1, intlist=[1,2,3]), Row(a=2, intlist=[4,5,6])])
df.show()
## +-+--------------------+
## |a| intlist|
## +-+--------------------+
## |1|ArrayBuffer(1, 2, 3)|
## |2|ArrayBuffer(4, 5, 6)|
## +-+--------------------+
Используйте explode
, чтобы сгладить столбец списка:
from pyspark.sql.functions import explode
df.select(df.a, explode(df.intlist)).show()
## +-+---+
## |a|_c0|
## +-+---+
## |1| 1|
## |1| 2|
## |1| 3|
## |2| 4|
## |2| 5|
## |2| 6|
## +-+---+
Ответ 3
Другим подходом будет использование сопоставления шаблонов следующим образом:
val rdd: RDD[(String, List[(String, String)]] = dataFrame.map(_.toSeq.toList match {
case List(key: String, inners: Seq[Row]) => key -> inners.map(_.toSeq.toList match {
case List(a:String, b: String) => (a, b)
}).toList
})
Вы можете сопоставлять совпадение непосредственно на строке, но, скорее всего, это может быть неудачно по нескольким причинам.
Ответ 4
Выше ответы - это отличные ответы и решать этот вопрос с разных сторон; Spark SQL также является весьма полезным способом доступа к вложенным данным.
Здесь пример использования метода explode() в SQL непосредственно для запроса вложенной коллекции.
SELECT hholdid, tsp.person_seq_no
FROM ( SELECT hholdid, explode(tsp_ids) as tsp
FROM disc_mrt.unified_fact uf
)
tsp_ids - это вложенная структура, которая имеет много атрибутов, включая person_seq_no, которые я выбираю во внешнем запросе выше.
Выше было протестировано в Spark 2.0. Я сделал небольшой тест, и он не работает в Spark 1.6. Этот вопрос задавали, когда Spark 2 не был рядом, поэтому этот ответ добавляет красиво в список доступных вариантов для работы с вложенными структурами.
Отказоустойчивые неразрешенные JIRA на explode() для доступа SQL: