Ответ 1
Начальный ответ (одно предположение о временном ряду):
Прежде всего, попробуйте избежать функций окна, если вы не можете предоставить предложение PARTITION BY
. Он перемещает данные в один раздел, поэтому большую часть времени это просто невозможно.
Что вы можете сделать, это заполнить пробелы на RDD
с помощью mapPartitionsWithIndex
. Поскольку вы не представили пример данных или ожидаемого вывода, считайте это псевдокодом не реальной программой Scala:
-
сначала позволяет упорядочить
DataFrame
по дате и преобразовать вRDD
import org.apache.spark.sql.Row import org.apache.spark.rdd.RDD val rows: RDD[Row] = df.orderBy($"Date").rdd
-
next позволяет найти последнее не нулевое наблюдение за раздел
def notMissing(row: Row): Boolean = ??? val toCarry: scala.collection.Map[Int,Option[org.apache.spark.sql.Row]] = rows .mapPartitionsWithIndex{ case (i, iter) => Iterator((i, iter.filter(notMissing(_)).toSeq.lastOption)) } .collectAsMap
-
и преобразуйте этот
Map
в широковещательную рассылкуval toCarryBd = sc.broadcast(toCarry)
-
окончательно перечислить разделы, снова заполнив пробелы:
def fill(i: Int, iter: Iterator[Row]): Iterator[Row] = { // If it is the beginning of partition and value is missing // extract value to fill from toCarryBd.value // Remember to correct for empty / only missing partitions // otherwise take last not-null from the current partition } val imputed: RDD[Row] = rows .mapPartitionsWithIndex{ case (i, iter) => fill(i, iter) }
-
наконец, вернитесь к DataFrame
Изменить (секционированные/временные ряды по групповым данным):
Дьявол в деталях. Если ваши данные разбиты на разделы, тогда вся проблема может быть решена с помощью groupBy
. Предположим, вы просто разделите столбец "v" типа T
и Date
- это целая временная метка:
def fill(iter: List[Row]): List[Row] = {
// Just go row by row and fill with last non-empty value
???
}
val groupedAndSorted = df.rdd
.groupBy(_.getAs[T]("k"))
.mapValues(_.toList.sortBy(_.getAs[Int]("Date")))
val rows: RDD[Row] = groupedAndSorted.mapValues(fill).values.flatMap(identity)
val dfFilled = sqlContext.createDataFrame(rows, df.schema)
Таким образом, вы можете одновременно заполнить все столбцы.
Можно ли это сделать с помощью DataFrames вместо преобразования взад и вперед в RDD?
Это зависит, хотя вряд ли это будет эффективно. Если максимальный зазор относительно невелик, вы можете сделать что-то вроде этого:
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.{WindowSpec, Window}
import org.apache.spark.sql.Column
val maxGap: Int = ??? // Maximum gap between observations
val columnsToFill: List[String] = ??? // List of columns to fill
val suffix: String = "_" // To disambiguate between original and imputed
// Take lag 1 to maxGap and coalesce
def makeCoalesce(w: WindowSpec)(magGap: Int)(suffix: String)(c: String) = {
// Generate lag values between 1 and maxGap
val lags = (1 to maxGap).map(lag(col(c), _)over(w))
// Add current, coalesce and set alias
coalesce(col(c) +: lags: _*).alias(s"$c$suffix")
}
// For each column you want to fill nulls apply makeCoalesce
val lags: List[Column] = columnsToFill.map(makeCoalesce(w)(maxGap)("_"))
// Finally select
val dfImputed = df.select($"*" :: lags: _*)
Его можно легко настроить, чтобы использовать разный максимальный зазор на столбе.
Более простой способ добиться аналогичного результата в последней версии Spark - использовать last
с ignoreNulls
:
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window
val w = Window.partitionBy($"k").orderBy($"Date")
.rowsBetween(Window.unboundedPreceding, -1)
df.withColumn("value", coalesce($"value", last($"value", true).over(w)))
В то время как можно отказаться от предложения partitionBy
и применить этот метод глобально, это было бы слишком дорогостоящим с большими наборами данных.