Как указать схему для CSV файла без использования класса Scala case?
Я загружаю CSV файл в DataFrame, как показано ниже.
val conf=new SparkConf().setAppName("dataframes").setMaster("local")
val sc=new SparkContext(conf)
val spark=SparkSession.builder().getOrCreate()
import spark.implicits._
val df = spark.
read.
format("org.apache.spark.csv").
option("header", true).
csv("/home/cloudera/Book1.csv")
scala> df.printSchema()
root
|-- name: string (nullable = true)
|-- address: string (nullable = true)
|-- age: string (nullable = true)
Как изменить столбец age
типа Int
?
Ответы
Ответ 1
Существует опция inferSchema
для автоматического распознавания типа переменной:
val df=spark.read
.format("org.apache.spark.csv")
.option("header", true)
.option("inferSchema", true) // <-- HERE
.csv("/home/cloudera/Book1.csv")
Первоначально spark-csv
представлял собой внешнюю библиотеку для блоков данных, но был включен в базовую версию spark начиная с версии 2.0. Вы можете обратиться к документации на странице библиотеки github, чтобы найти доступные опции.
Ответ 2
Учитывая val spark=SparkSession.builder().getOrCreate()
, я думаю, вы используете Spark 2.x.
Прежде всего, обратите внимание, что Spark 2.x имеет встроенную поддержку формата CSV и поэтому не требует указания формата по его длинному имени, т.е. org.apache.spark.csv
, а просто csv
.
spark.read.format("csv")...
Поскольку вы используете оператор csv
, подразумевается формат CSV, поэтому вы можете пропустить/удалить format("csv")
.
// note that I removed format("csv")
spark.read.option("header", true).csv("/home/cloudera/Book1.csv")
С этим у вас есть много вариантов, но я настоятельно рекомендую использовать класс case для... просто схемы. Посмотрите последнее решение, если вам интересно, как это сделать в Spark 2.0.
оператор литья
Вы можете использовать cast.
scala> Seq("1").toDF("str").withColumn("num", 'str cast "int").printSchema
root
|-- str: string (nullable = true)
|-- num: integer (nullable = true)
Использование StructType
Вы также можете использовать свою собственную ручную схему с StructType и StructField следующим образом:
import org.apache.spark.sql.types._
val schema = StructType(
StructField("str", StringType, true) ::
StructField("num", IntegerType, true) :: Nil)
scala> schema.printTreeString
root
|-- str: string (nullable = true)
|-- num: integer (nullable = true)
val q = spark.
read.
option("header", true).
schema(schema).
csv("numbers.csv")
scala> q.printSchema
root
|-- str: string (nullable = true)
|-- num: integer (nullable = true)
Схема DSL
То, что я нашел довольно интересным в последнее время, было так называемым Schema DSL. Вышеприведенная схема, построенная с использованием StructType
и StructField
, может быть переписана следующим образом:
import org.apache.spark.sql.types._
val schema = StructType(
$"str".string ::
$"num".int :: Nil)
scala> schema.printTreeString
root
|-- str: string (nullable = true)
|-- num: integer (nullable = true)
// or even
val schema = new StructType().
add($"str".string).
add($"num".int)
scala> schema.printTreeString
root
|-- str: string (nullable = true)
|-- num: integer (nullable = true)
кодеры
Кодеры настолько просты в использовании, что трудно поверить, что вы не можете их хотеть, даже для построения схемы без использования StructType
, StructField
и DataType
.
// Define a business object that describes your dataset
case class MyRecord(str: String, num: Int)
// Use Encoders object to create a schema off the business object
import org.apache.spark.sql.Encoders
val schema = Encoders.product[MyRecord].schema
scala> schema.printTreeString
root
|-- str: string (nullable = true)
|-- num: integer (nullable = false)
Ответ 3
Что вы можете сделать, это использовать UDF в этом случае:
Шаг 1. Создайте udf, который преобразует String в Int.
val stringToIntUDF = udf((value:String)=>value.toInt)
Шаг 2. Примените этот UDF к столбцу, который вы хотите преобразовать!
val updatedDF = df.withColumns("age",stringToIntUDF(df("age")))
updatedDF.printSchema
Это даст вам желаемый результат!
Если вы просто хотите вывести вашу схему из файла CSV. Тогда решение @vdep кажется правильным!
val df=spark.read
.format("org.apache.spark.csv")
.option("header",true)
.option("inferSchema", "true") // <-- HERE
.csv("/home/cloudera/Book1.csv")