Ответ 1
Вы можете сделать это, по крайней мере, для фильтрации, если хотите использовать интегрированный язык.
Для файла данных date.txt, содержащего:
one,2014-06-01
two,2014-07-01
three,2014-08-01
four,2014-08-15
five,2014-09-15
В своем UDF вы можете упаковать как можно больше магии даты Scala, но я буду держать ее просто:
def myDateFilter(date: String) = date contains "-08-"
Задайте все, как показано ниже: это много из Руководство по программированию.
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext._
// case class for your records
case class Entry(name: String, when: String)
// read and parse the data
val entries = sc.textFile("dates.txt").map(_.split(",")).map(e => Entry(e(0),e(1)))
Вы можете использовать UDF как часть вашего предложения WHERE:
val augustEntries = entries.where('when)(myDateFilter).select('name, 'when)
и посмотреть результаты:
augustEntries.map(r => r(0)).collect().foreach(println)
Обратите внимание на версию метода where
, который я использовал, объявленную в документе doc:
def where[T1](arg1: Symbol)(udf: (T1) ⇒ Boolean): SchemaRDD
Итак, UDF может принимать только один аргумент, но вы можете составить несколько вызовов .where()
для фильтрации по нескольким столбцам.
Изменить для Spark 1.2.0 (и действительно 1.1.0 тоже)
Пока он не документирован, Spark теперь поддерживает регистрацию UDF, поэтому он может быть запрошен с SQL.
Вышеуказанный UDF можно зарегистрировать, используя:
sqlContext.registerFunction("myDateFilter", myDateFilter)
и если таблица была зарегистрирована
sqlContext.registerRDDAsTable(entries, "entries")
он может быть запрошен с помощью
sqlContext.sql("SELECT * FROM entries WHERE myDateFilter(when)")
Подробнее см. этот пример.