Ответ 1
Как stefanobaghino, схема анализатора кэшируется после анализа, и оптимизатор не должен ее изменять.
Если вы используете Spark 2.2, вы можете воспользоваться SPARK-18127 и применить правило в Analyzer.
Если вы запустите это фиктивное приложение
package panos.bletsos
import org.apache.spark.sql.catalyst.expressions.NamedExpression
import org.apache.spark.sql.{Dataset, SparkSession}
import org.apache.spark.sql.catalyst.rules._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.SparkSessionExtensions
case class ReorderColumnsOnProjectOptimizationRule(spark: SparkSession) extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan transformDown {
case p: Project => {
val fields = p.projectList
if (checkConditions(fields, p.child)) {
val modifiedFieldsObject = optimizePlan(fields, p.child, plan)
val projectUpdated = p.copy(modifiedFieldsObject, p.child)
projectUpdated
} else {
p
}
}
}
private def checkConditions(fields: Seq[NamedExpression], child: LogicalPlan): Boolean = {
// compare UDFs computation cost and return Boolean
val needsOptimization = listHaveTwoUDFsEnabledForOptimization(fields)
if (needsOptimization) println(fields.mkString(" | "))
needsOptimization
}
private def listHaveTwoUDFsEnabledForOptimization(fields: Seq[NamedExpression]): Boolean = {
// a simple priority order based on UDF name suffix
val myPriorityList = fields.map((e) => {
if (e.name.toString().startsWith("udf")) {
Integer.parseInt(e.name.toString().split("_")(1))
} else {
0
}
}).filter(e => e > 0)
// Do UDF with less cost before, so I need change the fields order
myPriorityList.size == 2 && myPriorityList(0) > myPriorityList(1)
}
private def optimizePlan(fields: Seq[NamedExpression],
child: LogicalPlan,
plan: LogicalPlan): Seq[NamedExpression] = {
// change order on field list. Return LogicalPlan modified
val myListWithUDF = fields.filter((e) => e.name.toString().startsWith("udf"))
if (myListWithUDF.size != 2) {
throw new UnsupportedOperationException(
s"The size of UDF list have ${myListWithUDF.size} elements.")
}
val myModifiedList: Seq[NamedExpression] = Seq(myListWithUDF(1), myListWithUDF(0))
val myListWithoutUDF = fields.filter((e) => !e.name.toString().startsWith("udf"))
val modifiedFielsObject = getFieldsReordered(myListWithoutUDF, myModifiedList)
val msg = "•••• optimizePlan called : " + fields.size + " columns on Project.\n" +
"•••• fields: " + fields.mkString(" | ") + "\n" +
"•••• UDFs to reorder:\n" + myListWithUDF.mkString(" | ") + "\n" +
"•••• field list Without UDF: " + myListWithoutUDF.mkString(" | ") + "\n" +
"•••• modifiedFielsObject: " + modifiedFielsObject.mkString(" | ") + "\n"
modifiedFielsObject
}
private def getFieldsReordered(fieldsWithoutUDFs: Seq[NamedExpression],
fieldsWithUDFs: Seq[NamedExpression]): Seq[NamedExpression] = {
fieldsWithoutUDFs.union(fieldsWithUDFs)
}
}
case class R0(x: Int,
p: Option[Int] = Some((new scala.util.Random).nextInt(999)),
q: Option[Int] = Some((new scala.util.Random).nextInt(999))
)
object App {
def main(args : Array[String]) {
type ExtensionsBuilder = SparkSessionExtensions => Unit
// inject the rule here
val f: ExtensionsBuilder = { e =>
e.injectResolutionRule(ReorderColumnsOnProjectOptimizationRule)
}
val spark = SparkSession
.builder()
.withExtensions(f)
.getOrCreate()
def createDsR0(spark: SparkSession): Dataset[R0] = {
import spark.implicits._
val ds = spark.range(3)
val xdsR0 = ds.map((i) => {
R0(i.intValue() + 1)
})
// IMPORTANT: The cache here is mandatory
xdsR0.cache()
}
val dsR0 = createDsR0(spark)
val udfA_99 = (p: Int) => Math.cos(p * p) // higher cost Function
val udfB_10 = (q: Int) => q + 1 // lower cost Function
println("*** I' going to register my UDF ***")
spark.udf.register("myUdfA", udfA_99)
spark.udf.register("myUdfB", udfB_10)
val dsR1 = {
val ret1DS = dsR0.selectExpr("x", "p", "q", "myUdfA(p) as udfA_99")
val result = ret1DS.cache()
dsR0.show()
result.show()
result
}
val dsR2 = {
val ret2DS = dsR1.selectExpr("x", "p", "q", "udfA_99", "myUdfB(p) as udfB_10")
val result = ret2DS.cache()
dsR0.show()
dsR1.show()
result.show()
result
}
}
}
он напечатает
+---+---+---+-------+-------------------+
| x| p| q|udfB_10| udfA_99|
+---+---+---+-------+-------------------+
| 1|392|746| 393|-0.7508388993643841|
| 2|778|582| 779| 0.9310990915956336|
| 3|661| 34| 662| 0.6523545972748773|
+---+---+---+-------+-------------------+