Итерировать строки и столбцы в области данных Spark
У меня есть следующий пакет данных Spark, который создается динамически:
val sf1 = StructField("name", StringType, nullable = true)
val sf2 = StructField("sector", StringType, nullable = true)
val sf3 = StructField("age", IntegerType, nullable = true)
val fields = List(sf1,sf2,sf3)
val schema = StructType(fields)
val row1 = Row("Andy","aaa",20)
val row2 = Row("Berta","bbb",30)
val row3 = Row("Joe","ccc",40)
val data = Seq(row1,row2,row3)
val df = spark.createDataFrame(spark.sparkContext.parallelize(data), schema)
df.createOrReplaceTempView("people")
val sqlDF = spark.sql("SELECT * FROM people")
Теперь мне нужно перебирать каждую строку и столбец в sqlDF
для печати каждого столбца, это моя попытка:
sqlDF.foreach { row =>
row.foreach { col => println(col) }
}
row
является row
Row
, но не является итерабельным, почему этот код row.foreach
ошибку компиляции в row.foreach
. Как перебирать каждый столбец в Row
?
Ответы
Ответ 1
Вы можете преобразовать Row
в Seq
с помощью toSeq
. После обращения к Seq
вы можете перебирать его, как обычно, с помощью foreach
, map
или всего, что вам нужно
sqlDF.foreach { row =>
row.toSeq.foreach{col => println(col) }
}
Выход:
Berta
bbb
30
Joe
Andy
aaa
20
ccc
40
Ответ 2
Предположим, у вас есть Dataframe
, как показано ниже
+-----+------+---+
| name|sector|age|
+-----+------+---+
| Andy| aaa| 20|
|Berta| bbb| 30|
| Joe| ccc| 40|
+-----+------+---+
Чтобы зациклить свой Dataframe и извлечь элементы из Dataframe, вы можете выбрать один из следующих подходов.
Подход 1 - цикл с использованием foreach
Зацикливание цикла данных напрямую с использованием цикла foreach
невозможно. Для этого сначала необходимо определить схему кадра данных с помощью case class
, а затем указать эту схему для кадра данных.
import spark.implicits._
import org.apache.spark.sql._
case class cls_Employee(name:String, sector:String, age:Int)
val df = Seq(cls_Employee("Andy","aaa", 20), cls_Employee("Berta","bbb", 30), cls_Employee("Joe","ccc", 40)).toDF()
df.as[cls_Employee].take(df.count.toInt).foreach(t => println(s"name=${t.name},sector=${t.sector},age=${t.age}"))
Пожалуйста, смотрите результат ниже:
Подход 2 - цикл с использованием rdd
Используйте rdd.collect
поверх вашего кадра данных. Переменная row
будет содержать каждую строку Dataframe типа строки rdd
. Чтобы получить каждый элемент из строки, используйте row.mkString(",")
, который будет содержать значение каждой строки в значениях, разделенных запятыми. Используя функцию split
(встроенную функцию), вы можете получить доступ к каждому значению столбца строки rdd
с помощью индекса.
for (row <- df.rdd.collect)
{
var name = row.mkString(",").split(",")(0)
var sector = row.mkString(",").split(",")(1)
var age = row.mkString(",").split(",")(2)
}
Обратите внимание, что у этого подхода есть два недостатка.
1. Если в значении столбца есть ,
, данные будут ошибочно разбиты на соседний столбец.
2. rdd.collect
- это action
, который возвращает все данные в память драйвера, где память драйвера может быть не настолько большой, чтобы хранить данные, в результате чего происходит сбой приложения.
Я бы рекомендовал использовать подход 1.
Подход 3 - Используя где и выберите
Вы можете напрямую использовать where
и select
, которые будут внутренне зацикливаться и находить данные. Поскольку он не должен выбрасывать Index из связанной исключительной ситуации, используется условие if
if(df.where($"name" === "Andy").select(col("name")).collect().length >= 1)
name = df.where($"name" === "Andy").select(col("name")).collect()(0).get(0).toString
Подход 4 - Использование временных таблиц
Вы можете зарегистрировать фрейм данных как временный, который будет храниться в памяти искры. Затем вы можете использовать запрос на выборку, как и в другой базе данных, для запроса данных, а затем собирать и сохранять в переменной
df.registerTempTable("student")
name = sqlContext.sql("select name from student where name='Andy'").collect()(0).toString().replace("[","").replace("]","")
Ответ 3
sqlDF.foreach
не работает для меня, но Подход 1 из ответа @Sarath Avanavu работает, но иногда он также играл с порядком записей.
Я нашел еще один способ, который работает
df.collect().foreach { row =>
println(row.mkString(","))
}
Ответ 4
Вы должны использовать mkString
в своей Row
:
sqlDF.foreach { row =>
println(row.mkString(","))
}
Но обратите внимание, что это будет напечатано внутри исполнителей JVM, поэтому вы не увидите выход (если вы не работаете с master = local)
Ответ 5
просто соберите результат, а затем примените foreach
df.collect().foreach(println)
Ответ 6
Это работало нормально для меня
sqlDF.collect().foreach(row => row.toSeq.foreach(col => println(col)))