Ответ 1
TL; DR:. Для версий Spark до 1.6, чтобы получить "контрольную точку DF", мое предложенное решение основано на другом ответе, но с одной дополнительной строкой:
df.rdd.checkpoint
df.rdd.count
val df2 = sqlContext.createDataFrame(df.rdd, df.schema)
// df2 is checkpointed
Объяснение
Обновлено после дальнейших исследований.
Как указано, контрольная точка DataFrame напрямую отсутствует (Spark 1.6.1), хотя есть issue для это на Spark Jira.
Таким образом, возможное обходное решение - это предложение, предложенное в другом ответе:
df.rdd.checkpoint // Assuming the checkpoint dir has already been set
df.count // An action to compute the checkpoint
Однако при таком подходе будет проверяться только объект df.rdd. Это можно проверить, позвонив toDebugString
в df.rdd
:
scala> df.rdd.toDebugString
(32) MapPartitionsRDD[1] at rdd at <console>:38 []
| ReliableCheckpointRDD[2] at count at <console>:38 []
а затем вызывая toDebugString
после быстрого преобразования в df
(обратите внимание, что я создал свой DataFrame из источника JDBC), возвращает следующее:
scala> df.withColumn("new_column", lit(0)).rdd.toDebugString
res4: String =
(32) MapPartitionsRDD[5] at rdd at <console>:38 []
| MapPartitionsRDD[4] at rdd at <console>:38 []
| JDBCRDD[3] at rdd at <console>:38 []
df.explain
также показывает подсказку:
scala> df.explain
== Physical Plan ==
Scan JDBCRelation (...)
Итак, чтобы фактически получить "контрольно-контрольный" DataFrame, я могу думать только о создании нового из контрольного RDD:
val newDF = sqlContext.createDataFrame(df.rdd, df.schema)
// or
val newDF = df.rdd.map {
case Row(val1: Int, ..., valN: Int) => (val1, ..., valN)
}.toDF("col1", ..., "colN")
Затем мы можем проверить, что новый DataFrame "проверен":
1) newDF.explain
:
scala> newDF.explain
== Physical Plan ==
Scan PhysicalRDD[col1#5, col2#6, col3#7]
2) newDF.rdd.toDebugString
:
scala> newDF.rdd.toDebugString
res7: String =
(32) MapPartitionsRDD[10] at rdd at <console>:40 []
| MapPartitionsRDD[8] at createDataFrame at <console>:37 []
| MapPartitionsRDD[1] at rdd at <console>:38 []
| ReliableCheckpointRDD[2] at count at <console>:38 []
3) С преобразованием:
scala> newDF.withColumn("new_column", lit(0)).rdd.toDebugString
res9: String =
(32) MapPartitionsRDD[12] at rdd at <console>:40 []
| MapPartitionsRDD[11] at rdd at <console>:40 []
| MapPartitionsRDD[8] at createDataFrame at <console>:37 []
| MapPartitionsRDD[1] at rdd at <console>:38 []
| ReliableCheckpointRDD[2] at count at <console>:38 []
Кроме того, я попробовал несколько более сложных преобразований, и на практике я смог проверить, что объект newDF
был проверен.
Таким образом, единственный способ, которым я нашел надежную контрольную точку DataFrame, был путем проверки связанного с ним RDD и создания из него нового объекта DataFrame.
Надеюсь, это поможет. Приветствия.