Ответ 1
Вы можете собирать уникальные значения состояния и просто отображать результирующий массив:
val states = df.select("State").distinct.collect.flatMap(_.toSeq)
val byStateArray = states.map(state => df.where($"State" <=> state))
или для отображения:
val byStateMap = states
.map(state => (state -> df.where($"State" <=> state)))
.toMap
То же самое в Python:
from itertools import chain
from pyspark.sql.functions import col
states = chain(*df.select("state").distinct().collect())
# PySpark 2.3 and later
# In 2.2 and before col("state") == state)
# should give the same outcome, ignoring NULLs
# if NULLs are important
# (lit(state).isNull() & col("state").isNull()) | (col("state") == state)
df_by_state = {state:
df.where(col("state").eqNullSafe(state)) for state in states}
Очевидная проблема заключается в том, что для каждого уровня требуется полное сканирование данных, поэтому это дорогостоящая операция. Если вы ищете способ просто разделить выход, см. Также Как разделить RDD на два или более RDD?
В частности, вы можете написать Dataset
, разделенный на интересующий столбец:
val path: String = ???
df.write.partitionBy("State").parquet(path)
и при необходимости верните сообщение:
// Depend on partition prunning
for { state <- states } yield spark.read.parquet(path).where($"State" === state)
// or explicitly read the partition
for { state <- states } yield spark.read.parquet(s"$path/State=$state")
В зависимости от размера данных, количества уровней разделения, хранения и уровня персистентности ввода он может быть быстрее или медленнее, чем несколько фильтров.