Итерировать строки и столбцы в области данных Spark

У меня есть следующий пакет данных Spark, который создается динамически:

val sf1 = StructField("name", StringType, nullable = true)
val sf2 = StructField("sector", StringType, nullable = true)
val sf3 = StructField("age", IntegerType, nullable = true)

val fields = List(sf1,sf2,sf3)
val schema = StructType(fields)

val row1 = Row("Andy","aaa",20)
val row2 = Row("Berta","bbb",30)
val row3 = Row("Joe","ccc",40)

val data = Seq(row1,row2,row3)

val df = spark.createDataFrame(spark.sparkContext.parallelize(data), schema)

df.createOrReplaceTempView("people")
val sqlDF = spark.sql("SELECT * FROM people")

Теперь мне нужно перебирать каждую строку и столбец в sqlDF для печати каждого столбца, это моя попытка:

sqlDF.foreach { row =>
  row.foreach { col => println(col) }
}

row является row Row, но не является итерабельным, почему этот код row.foreach ошибку компиляции в row.foreach. Как перебирать каждый столбец в Row?

Ответы

Ответ 1

Вы можете преобразовать Row в Seq с помощью toSeq. После обращения к Seq вы можете перебирать его, как обычно, с помощью foreach, map или всего, что вам нужно

    sqlDF.foreach { row => 
           row.toSeq.foreach{col => println(col) }
    }

Выход:

Berta
bbb
30
Joe
Andy
aaa
20
ccc
40

Ответ 2

Предположим, у вас есть Dataframe, как показано ниже

+-----+------+---+
| name|sector|age|
+-----+------+---+
| Andy|   aaa| 20|
|Berta|   bbb| 30|
|  Joe|   ccc| 40|
+-----+------+---+

Чтобы зациклить свой Dataframe и извлечь элементы из Dataframe, вы можете выбрать один из следующих подходов.

Подход 1 - цикл с использованием foreach

Зацикливание цикла данных напрямую с использованием цикла foreach невозможно. Для этого сначала необходимо определить схему кадра данных с помощью case class, а затем указать эту схему для кадра данных.

import spark.implicits._
import org.apache.spark.sql._
case class cls_Employee(name:String, sector:String, age:Int)
val df = Seq(cls_Employee("Andy","aaa", 20), cls_Employee("Berta","bbb", 30), cls_Employee("Joe","ccc", 40)).toDF()
df.as[cls_Employee].take(df.count.toInt).foreach(t => println(s"name=${t.name},sector=${t.sector},age=${t.age}"))

Пожалуйста, смотрите результат ниже:

enter image description here

Подход 2 - цикл с использованием rdd

Используйте rdd.collect поверх вашего кадра данных. Переменная row будет содержать каждую строку Dataframe типа строки rdd. Чтобы получить каждый элемент из строки, используйте row.mkString(","), который будет содержать значение каждой строки в значениях, разделенных запятыми. Используя функцию split (встроенную функцию), вы можете получить доступ к каждому значению столбца строки rdd с помощью индекса.

for (row <- df.rdd.collect)
{   
    var name = row.mkString(",").split(",")(0)
    var sector = row.mkString(",").split(",")(1)
    var age = row.mkString(",").split(",")(2)   
}

Обратите внимание, что у этого подхода есть два недостатка.
1. Если в значении столбца есть ,, данные будут ошибочно разбиты на соседний столбец.
2. rdd.collect - это action, который возвращает все данные в память драйвера, где память драйвера может быть не настолько большой, чтобы хранить данные, в результате чего происходит сбой приложения.

Я бы рекомендовал использовать подход 1.

Подход 3 - Используя где и выберите

Вы можете напрямую использовать where и select, которые будут внутренне зацикливаться и находить данные. Поскольку он не должен выбрасывать Index из связанной исключительной ситуации, используется условие if

if(df.where($"name" === "Andy").select(col("name")).collect().length >= 1)
    name = df.where($"name" === "Andy").select(col("name")).collect()(0).get(0).toString

Подход 4 - Использование временных таблиц

Вы можете зарегистрировать фрейм данных как временный, который будет храниться в памяти искры. Затем вы можете использовать запрос на выборку, как и в другой базе данных, для запроса данных, а затем собирать и сохранять в переменной

df.registerTempTable("student")
name = sqlContext.sql("select name from student where name='Andy'").collect()(0).toString().replace("[","").replace("]","")

Ответ 3

sqlDF.foreach не работает для меня, но Подход 1 из ответа @Sarath Avanavu работает, но иногда он также играл с порядком записей.

Я нашел еще один способ, который работает

df.collect().foreach { row =>
   println(row.mkString(","))
}

Ответ 4

Вы должны использовать mkString в своей Row:

sqlDF.foreach { row =>
  println(row.mkString(",")) 
}

Но обратите внимание, что это будет напечатано внутри исполнителей JVM, поэтому вы не увидите выход (если вы не работаете с master = local)

Ответ 5

просто соберите результат, а затем примените foreach

df.collect().foreach(println)

Ответ 6

Это работало нормально для меня

sqlDF.collect().foreach(row => row.toSeq.foreach(col => println(col)))