Ответ 1
Spark 2.1 должен иметь встроенную поддержку для этого варианта использования (см. # 15354).
import org.apache.spark.sql.functions.to_json
df.select(to_json(struct($"c1", $"c2", $"c3")))
Я хотел бы создать JSON из Spark v.1.6 (используя scala) dataframe. Я знаю, что существует простое решение df.toJSON
.
Однако моя проблема выглядит несколько иначе. Рассмотрим, например, фреймворк данных со следующими столбцами:
| A | B | C1 | C2 | C3 |
-------------------------------------------
| 1 | test | ab | 22 | TRUE |
| 2 | mytest | gh | 17 | FALSE |
Я хотел бы иметь в конце фреймворк с
| A | B | C |
----------------------------------------------------------------
| 1 | test | { "c1" : "ab", "c2" : 22, "c3" : TRUE } |
| 2 | mytest | { "c1" : "gh", "c2" : 17, "c3" : FALSE } |
где C - JSON, содержащий C1
, C2
, C3
. К сожалению, во время компиляции я не знаю, как выглядит dataframe (за исключением столбцов A
и B
, которые всегда "исправлены" ).
В связи с тем, почему мне это нужно: я использую Protobuf для отправки результатов. К сожалению, у моего DataFrame иногда больше столбцов, чем ожидалось, и я все равно отправлял их через Protobuf, но я не хочу указывать все столбцы в определении.
Как я могу это достичь?
Spark 2.1 должен иметь встроенную поддержку для этого варианта использования (см. # 15354).
import org.apache.spark.sql.functions.to_json
df.select(to_json(struct($"c1", $"c2", $"c3")))
Сначала преобразуем C в struct
:
val dfStruct = df.select($"A", $"B", struct($"C1", $"C2", $"C3").alias("C"))
Эта структура может быть преобразована в JSONL с помощью toJSON
по-прежнему:
dfStruct.toJSON.collect
// Array[String] = Array(
// {"A":1,"B":"test","C":{"C1":"ab","C2":22,"C3":true}},
// {"A":2,"B":"mytest","C":{"C1":"gh","C2":17,"C3":false}})
Мне неизвестен какой-либо встроенный метод, который может конвертировать один столбец, но вы можете либо преобразовать его индивидуально, либо join
или использовать ваш любимый парсер JSON в UDF.
case class C(C1: String, C2: Int, C3: Boolean)
object CJsonizer {
import org.json4s._
import org.json4s.JsonDSL._
import org.json4s.jackson.Serialization
import org.json4s.jackson.Serialization.write
implicit val formats = Serialization.formats(org.json4s.NoTypeHints)
def toJSON(c1: String, c2: Int, c3: Boolean) = write(C(c1, c2, c3))
}
val cToJSON = udf((c1: String, c2: Int, c3: Boolean) =>
CJsonizer.toJSON(c1, c2, c3))
df.withColumn("c_json", cToJSON($"C1", $"C2", $"C3"))
Здесь нет парсера JSON и он адаптируется к вашей схеме:
import org.apache.spark.sql.functions.{col, concat, concat_ws, lit}
df.select(
col(df.columns(0)),
col(df.columns(1)),
concat(
lit("{"),
concat_ws(",",df.dtypes.slice(2, df.dtypes.length).map(dt => {
val c = dt._1;
val t = dt._2;
concat(
lit("\"" + c + "\":" + (if (t == "StringType") "\""; else "") ),
col(c),
lit(if(t=="StringType") "\""; else "")
)
}):_*),
lit("}")
) as "C"
).collect()
Я использую эту команду для решения проблемы to_json:
output_df = (df.select(to_json(struct(col("*"))).alias("content")))