Создать новый столбец с функцией в Spark Dataframe
Я пытаюсь выяснить новый API-интерфейс DataFrame в Spark. кажется хорошим шагом вперед, но проблема с тем, что должно быть довольно простым. У меня есть dataframe с 2 столбцами, "ID" и "Amount". Как общий пример, скажем, я хочу вернуть новый столбец с именем "code", который возвращает код на основе значения "Amt". Я могу написать functiin что-то вроде этого:
def coder(myAmt:Integer):String {
if (myAmt > 100) "Little"
else "Big"
}
Когда я пытаюсь использовать его так:
val myDF = sqlContext.parquetFile("hdfs:/to/my/file.parquet")
myDF.withColumn("Code", coder(myDF("Amt")))
Я получаю ошибки несоответствия типа
found : org.apache.spark.sql.Column
required: Integer
Я попытался изменить тип ввода в своей функции на org.apache.spark.sql.Column, но затем я начал получать ошибки при компиляции функции, потому что он хочет иметь логическое выражение в выражении if.
Я делаю это неправильно? Есть ли лучший/другой способ сделать это, чем использовать withColumn?
Спасибо за вашу помощь.
Ответы
Ответ 1
Скажем, у вас есть столбец "Amt" в вашей схеме:
import org.apache.spark.sql.functions._
val myDF = sqlContext.parquetFile("hdfs:/to/my/file.parquet")
val coder: (Int => String) = (arg: Int) => {if (arg < 100) "little" else "big"}
val sqlfunc = udf(coder)
myDF.withColumn("Code", sqlfunc(col("Amt")))
Я думаю, что withColumn - это правильный способ добавить столбец
Ответ 2
Нам следует избегать определения функций udf
как можно больше из-за его накладных расходов столбцов serialization
и deserialization
.
Вы можете достичь решения с помощью простой when
искровой функции, как показано ниже
val myDF = sqlContext.parquetFile("hdfs:/to/my/file.parquet")
myDF.withColumn("Code", when(myDF("Amt") < 100, "Little").otherwise("Big"))