Извлечение `Seq [(String, String, String)]` из искры DataFrame
У меня есть искра DF с строками Seq[(String, String, String)]
. Я пытаюсь сделать что-то вроде flatMap
с этим, но все, что я делаю, заканчивается броском
java.lang.ClassCastException: org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema нельзя передать в scala.Tuple3
Я могу взять одну строку или несколько строк из DF просто отлично
df.map{ r => r.getSeq[Feature](1)}.first
возвращает
Seq[(String, String, String)] = WrappedArray([ancient,jj,o], [olympia_greece,nn,location] .....
и тип данных RDD кажется правильным.
org.apache.spark.rdd.RDD[Seq[(String, String, String)]]
Схема df
root
|-- article_id: long (nullable = true)
|-- content_processed: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- lemma: string (nullable = true)
| | |-- pos_tag: string (nullable = true)
| | |-- ne_tag: string (nullable = true)
Я знаю, что эта проблема связана с искровым sql, обрабатывающим строки RDD как org.apache.spark.sql.Row
, хотя они идиотически говорят, что это a Seq[(String, String, String)]
. Там есть связанный вопрос (ссылка ниже), но ответ на этот вопрос не работает для меня. Я также недостаточно знаком с искрами, чтобы выяснить, как превратить его в рабочее решение.
Являются ли строки Row[Seq[(String, String, String)]]
или Row[(String, String, String)]
или Seq[Row[(String, String, String)]]
или что-то еще более сумасшедшее.
Я пытаюсь сделать что-то вроде
df.map{ r => r.getSeq[Feature](1)}.map(_(1)._1)
который, похоже, работает, но фактически не работает
df.map{ r => r.getSeq[Feature](1)}.map(_(1)._1).first
выдает указанную выше ошибку. Итак, как я должен (например) получить первый элемент второго кортежа в каждой строке?
Кроме того, WHY имеет искру, предназначенную для этого, кажется, что идиотский утверждает, что что-то имеет один тип, когда на самом деле он не является и не может быть преобразован в заявленный тип.
Связанный вопрос: исключение GenericRowWithSchema при передаче ArrayBuffer в HashSet в DataFrame в RDD из таблицы Hive
Связанный отчет об ошибке: http://search-hadoop.com/m/q3RTt2bvwy19Dxuq1&subj=ClassCastException+when+extracting+and+collecting+DF+array+column+type
Ответы
Ответ 1
Ну, он не утверждает, что это кортеж. Он утверждает, что это struct
, который соответствует Row
:
import org.apache.spark.sql.Row
case class Feature(lemma: String, pos_tag: String, ne_tag: String)
case class Record(id: Long, content_processed: Seq[Feature])
val df = Seq(
Record(1L, Seq(
Feature("ancient", "jj", "o"),
Feature("olympia_greece", "nn", "location")
))
).toDF
val content = df.select($"content_processed").rdd.map(_.getSeq[Row](0))
Вы найдете точные правила отображения в Руководство по программированию Spark SQL.
Так как Row
не совсем красивая структура, вы, вероятно, захотите сопоставить ее с чем-то полезным:
content.map(_.map {
case Row(lemma: String, pos_tag: String, ne_tag: String) =>
(lemma, pos_tag, ne_tag)
})
или
content.map(_.map ( row => (
row.getAs[String]("lemma"),
row.getAs[String]("pos_tag"),
row.getAs[String]("ne_tag")
)))
Наконец, немного более краткий подход с Datasets
:
df.as[Record].rdd.map(_.content_processed)
или
df.select($"content_processed").as[Seq[(String, String, String)]]
хотя в данный момент это кажется немного затруднительным.
Существует важное отличие первого подхода (Row.getAs
) и второго (Dataset.as
). Первый извлекает объекты как Any
и применяет asInstanceOf
. Последний использует кодеры для преобразования между внутренними типами и желаемым представлением.
Ответ 2
object ListSerdeTest extends App {
implicit val spark: SparkSession = SparkSession
.builder
.master("local[2]")
.getOrCreate()
import spark.implicits._
val myDS = spark.createDataset(
Seq(
MyCaseClass(mylist = Array(("asd", "aa"), ("dd", "ee")))
)
)
myDS.toDF().printSchema()
myDS.toDF().foreach(
row => {
row.getSeq[Row](row.fieldIndex("mylist"))
.foreach {
case Row(a, b) => println(a, b)
}
}
)
}
case class MyCaseClass (
mylist: Seq[(String, String)]
)
Приведенный выше код является еще одним способом работы с вложенной структурой. По умолчанию Spark Encoder будет кодировать TupleX, делая их вложенными структурами, вот почему вы видите это странное поведение. и, как говорили другие в комментарии, вы не можете просто выполнить getAs[T]()
так как это просто приведение (x.asInstanceOf[T]
), поэтому вы получите исключения во время выполнения.