Ответ 1
Spark> = 2,4
Если необходимо, схему можно определить с помощью функции schema_of_json
(обратите внимание, что это предполагает, что произвольная строка является действительным представителем схемы).
import org.apache.spark.sql.functions.{lit, schema_of_json, from_json}
val schema = schema_of_json(lit(df.select($"jsonData").as[String].first))
df.withColumn("jsonData", from_json($"jsonData", schema, Map[String, String]()))
Spark> = 2.1
Вы можете использовать функцию from_json
:
import org.apache.spark.sql.functions.from_json
import org.apache.spark.sql.types._
val schema = StructType(Seq(
StructField("k", StringType, true), StructField("v", DoubleType, true)
))
df.withColumn("jsonData", from_json($"jsonData", schema))
Spark> = 1.6
Вы можете использовать get_json_object
, который принимает столбец и путь:
import org.apache.spark.sql.functions.get_json_object
val exprs = Seq("k", "v").map(
c => get_json_object($"jsonData", s"$$.$c").alias(c))
df.select($"*" +: exprs: _*)
и извлекает поля в отдельные строки, которые затем могут быть преобразованы в ожидаемые типы.
Аргумент path
выражается с использованием точечного синтаксиса, при этом начальный $.
обозначает корень документа (поскольку в приведенном выше коде используется интерполяция строк, $
необходимо экранировать, следовательно, $$.
).
Spark & lt; = 1,5:
Возможно ли это в настоящее время?
Насколько я знаю, это напрямую невозможно. Вы можете попробовать что-то похожее на это:
val df = sc.parallelize(Seq(
("1", """{"k": "foo", "v": 1.0}""", "some_other_field_1"),
("2", """{"k": "bar", "v": 3.0}""", "some_other_field_2")
)).toDF("key", "jsonData", "blobData")
Я предполагаю, что поле blob
не может быть представлено в JSON. В противном случае вы не сможете разделить и присоединиться:
import org.apache.spark.sql.Row
val blobs = df.drop("jsonData").withColumnRenamed("key", "bkey")
val jsons = sqlContext.read.json(df.drop("blobData").map{
case Row(key: String, json: String) =>
s"""{"key": "$key", "jsonData": $json}"""
})
val parsed = jsons.join(blobs, $"key" === $"bkey").drop("bkey")
parsed.printSchema
// root
// |-- jsonData: struct (nullable = true)
// | |-- k: string (nullable = true)
// | |-- v: double (nullable = true)
// |-- key: long (nullable = true)
// |-- blobData: string (nullable = true)
Альтернативный (более дешевый, хотя и более сложный) подход заключается в использовании UDF для анализа JSON и вывода столбца struct
или map
. Например что-то вроде этого:
import net.liftweb.json.parse
case class KV(k: String, v: Int)
val parseJson = udf((s: String) => {
implicit val formats = net.liftweb.json.DefaultFormats
parse(s).extract[KV]
})
val parsed = df.withColumn("parsedJSON", parseJson($"jsonData"))
parsed.show
// +---+--------------------+------------------+----------+
// |key| jsonData| blobData|parsedJSON|
// +---+--------------------+------------------+----------+
// | 1|{"k": "foo", "v":...|some_other_field_1| [foo,1]|
// | 2|{"k": "bar", "v":...|some_other_field_2| [bar,3]|
// +---+--------------------+------------------+----------+
parsed.printSchema
// root
// |-- key: string (nullable = true)
// |-- jsonData: string (nullable = true)
// |-- blobData: string (nullable = true)
// |-- parsedJSON: struct (nullable = true)
// | |-- k: string (nullable = true)
// | |-- v: integer (nullable = false)