Spark - загрузить CSV файл как DataFrame?
Я хотел бы прочитать CSV в искровом режиме и преобразовать его в DataFrame и сохранить его в HDFS с помощью df.registerTempTable("table_name")
Я пробовал:
scala> val df = sqlContext.load("hdfs:///csv/file/dir/file.csv")
Ошибка, которую я получил:
java.lang.RuntimeException: hdfs:///csv/file/dir/file.csv is not a Parquet file. expected magic number at tail [80, 65, 82, 49] but found [49, 59, 54, 10]
at parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:418)
at org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$refresh$6.apply(newParquet.scala:277)
at org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$refresh$6.apply(newParquet.scala:276)
at scala.collection.parallel.mutable.ParArray$Map.leaf(ParArray.scala:658)
at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:54)
at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:53)
at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:53)
at scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:56)
at scala.collection.parallel.mutable.ParArray$Map.tryLeaf(ParArray.scala:650)
at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:165)
at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:514)
at scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Какова правильная команда для загрузки CSV файла в качестве DataFrame в Apache Spark?
Ответы
Ответ 1
spark-csv является частью основной функциональности Spark и не требует отдельной библиотеки. Так что вы могли бы просто сделать, например,
df = spark.read.format("csv").option("header", "true").load("csvfile.csv")
В scala (это работает для любого формата с указанием разделителя, "," для csv, "\ t" для tsv и т.д.) val df = sqlContext.read.format("com.databricks.spark.csv").option("delimiter", ",").load("csvfile.csv")
Ответ 2
Разобрать CSV и загрузить как DataFrame/DataSet с Spark 2.x
Сначала инициализируйте объект SparkSession
по умолчанию, он будет доступен в оболочках как spark
val spark = org.apache.spark.sql.SparkSession.builder
.master("local")
.appName("Spark CSV Reader")
.getOrCreate;
Используйте любой из следующих способов загрузки CSV как DataFrame/DataSet
1. Сделайте это программным способом
val df = spark.read
.format("csv")
.option("header", "true") //first line in file has headers
.option("mode", "DROPMALFORMED")
.load("hdfs:///csv/file/dir/file.csv")
val df = spark.sql("SELECT * FROM csv.'hdfs:///csv/file/dir/file.csv'")
Зависимости:
"org.apache.spark" % "spark-core_2.11" % 2.0.0,
"org.apache.spark" % "spark-sql_2.11" % 2.0.0,
Версия Spark <2.0
val df = sqlContext.read
.format("com.databricks.spark.csv")
.option("header", "true")
.option("mode", "DROPMALFORMED")
.load("csv/file/path");
зависимости:
"org.apache.spark" % "spark-sql_2.10" % 1.6.0,
"com.databricks" % "spark-csv_2.10" % 1.6.0,
"com.univocity" % "univocity-parsers" % LATEST,
Ответ 3
Это для чьего Hadoop 2.6 и Spark 1.6 и без пакета "databricks".
import org.apache.spark.sql.types.{StructType,StructField,StringType,IntegerType};
import org.apache.spark.sql.Row;
val csv = sc.textFile("/path/to/file.csv")
val rows = csv.map(line => line.split(",").map(_.trim))
val header = rows.first
val data = rows.filter(_(0) != header(0))
val rdd = data.map(row => Row(row(0),row(1).toInt))
val schema = new StructType()
.add(StructField("id", StringType, true))
.add(StructField("val", IntegerType, true))
val df = sqlContext.createDataFrame(rdd, schema)
Ответ 4
С Spark 2.0 следуйте следующим образом: CSV
val conf = new SparkConf().setMaster("local[2]").setAppName("my app")
val sc = new SparkContext(conf)
val sparkSession = SparkSession.builder
.config(conf = conf)
.appName("spark session example")
.getOrCreate()
val path = "/Users/xxx/Downloads/usermsg.csv"
val base_df = sparkSession.read.option("header","true").
csv(path)
Ответ 5
В Java 1.8 Этот фрагмент кода отлично работает для чтения файлов CSV
POM.xml
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.0.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql_2.10 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.10</artifactId>
<version>2.0.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.scala-lang/scala-library -->
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.11.8</version>
</dependency>
<dependency>
<groupId>com.databricks</groupId>
<artifactId>spark-csv_2.10</artifactId>
<version>1.4.0</version>
</dependency>
Java
SparkConf conf = new SparkConf().setAppName("JavaWordCount").setMaster("local");
// create Spark Context
SparkContext context = new SparkContext(conf);
// create spark Session
SparkSession sparkSession = new SparkSession(context);
Dataset<Row> df = sparkSession.read().format("com.databricks.spark.csv").option("header", true).option("inferSchema", true).load("hdfs://localhost:9000/usr/local/hadoop_data/loan_100.csv");
//("hdfs://localhost:9000/usr/local/hadoop_data/loan_100.csv");
System.out.println("========== Print Schema ============");
df.printSchema();
System.out.println("========== Print Data ==============");
df.show();
System.out.println("========== Print title ==============");
df.select("title").show();
Ответ 6
Пример Penny Spark 2 - способ сделать это в spark2. Там есть еще один трюк: создайте для этого заголовок, выполнив начальное сканирование данных, установив опцию inferSchema
на true
Здесь, например, при условии, что spark
- это искровой сеанс, который вы настроили, - это операция загрузки в индексный файл CSV всех изображений Landsat, которые расположены на узле Amazon на S3.
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
val csvdata = spark.read.options(Map(
"header" -> "true",
"ignoreLeadingWhiteSpace" -> "true",
"ignoreTrailingWhiteSpace" -> "true",
"timestampFormat" -> "yyyy-MM-dd HH:mm:ss.SSSZZZ",
"inferSchema" -> "true",
"mode" -> "FAILFAST"))
.csv("s3a://landsat-pds/scene_list.gz")
Плохая новость: это вызывает сканирование через файл; для чего-то большого, подобного этому 20 + МБ, заархивированного CSV файла, который может занять 30 секунд в течение длительного времени. Имейте это в виду: вам лучше вручную кодировать схему, как только вы ее получите.
(фрагмент кода Apache Software License 2.0 лицензирован, чтобы избежать всякой двусмысленности, что я сделал как демонстрационный/интеграционный тест интеграции S3)
Ответ 7
Существует множество проблем при анализе CSV файла, он продолжает складываться, если размер файла больше, если в значениях столбца есть символы, отличные от английского /escape/separator/other, что может привести к ошибкам синтаксического анализа.
Тогда волшебство используется в опциях. Те, которые работали на меня и надежду, должны охватывать большинство крайних случаев в коде ниже:
### Create a Spark Session
spark = SparkSession.builder.master("local").appName("Classify Urls").getOrCreate()
### Note the options that are used. You may have to tweak these in case of error
html_df = spark.read.csv(html_csv_file_path,
header=True,
multiLine=True,
ignoreLeadingWhiteSpace=True,
ignoreTrailingWhiteSpace=True,
encoding="UTF-8",
sep=',',
quote='"',
escape='"',
maxColumns=2,
inferSchema=True)
Надеюсь, это поможет. Подробнее см.: Использование PySpark 2 для чтения CSV с исходным кодом HTML
Примечание: приведенный выше код относится к API Spark 2, где API чтения файлов CSV поставляется в комплекте со встроенными пакетами Spark, которые можно установить.
Примечание. PySpark - это оболочка Python для Spark и имеет тот же API, что и Scala/Java.
Ответ 8
В случае, если вы создаете банку с Scala 2.11 и Apache 2.0 или выше.
Нет необходимости создавать объект sqlContext
или sparkContext
. Просто объект SparkSession
удовлетворяет требованиям для всех нужд.
Ниже приведен мой код, который работает нормально:
import org.apache.spark.sql.{DataFrame, Row, SQLContext, SparkSession}
import org.apache.log4j.{Level, LogManager, Logger}
object driver {
def main(args: Array[String]) {
val log = LogManager.getRootLogger
log.info("**********JAR EXECUTION STARTED**********")
val spark = SparkSession.builder().master("local").appName("ValidationFrameWork").getOrCreate()
val df = spark.read.format("csv")
.option("header", "true")
.option("delimiter","|")
.option("inferSchema","true")
.load("d:/small_projects/spark/test.pos")
df.show()
}
}
Если вы работаете в кластере, просто измените .master("local")
на .master("yarn")
sparkBuilder
.master("yarn")
при определении объекта sparkBuilder
Документ Spark покрывает это: https://spark.apache.org/docs/2.2.0/sql-programming-guide.html
Ответ 9
Попробуйте это, если используете искру 2. 0+
For non-hdfs file:
df = spark.read.csv("file:///csvfile.csv")
For hdfs file:
df = spark.read.csv("hdfs:///csvfile.csv")
For hdfs file (with different delimiter than comma:
df = spark.read.option("delimiter","|")csv("hdfs:///csvfile.csv")
Примечание: - это работает для любого файла с разделителями. Просто используйте параметр ("разделитель",), чтобы изменить значение.
Надеюсь, это полезно.
Ответ 10
Формат файла по умолчанию - Parquet с spark.read.. и чтение файла csv, почему вы получаете исключение. Укажите формат csv с помощью api, который вы пытаетесь использовать.
Ответ 11
Загружает файл CSV и возвращает результат в виде DataFrame.
df=sparksession.read.option("header", true).csv("file_name.csv")
Датафрейм рассматривал файл как формат csv.
Ответ 12
Я могу сделать то же самое, что и ниже:
val conf = new SparkConf().setAppName("Test Spark").setMaster("local[1]")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
val txtDf = sqlContext.read.format("com.databricks.spark.csv")
.option("header", "true").load("D:\\spark-training\\employee.txt")
txtDf.registerTempTable("employee")
val employees = sqlContext.sql("select * from employee")
employees.printSchema()
employees.show()
Ответ 13
ПРИМЕЧАНИЕ. Для версий Spark раньше 1.7
val dataframe = sqlContext.read.format("com.databricks.spark.csv").
option("delimiter", "\t").
option("header", "true").
option("inferSchema", "true").
load("file_name")
для csv использовать разделитель как ',' и изменять параметры по мере необходимости, например header и inferSchema
для python просто возьмите val, и он работает
Но вам нужно передать этот пакет либо вашей искровой оболочке, либо исправить - подать как
spark-shell --packages com.databricks:spark-csv_2.10:1.4.0
or
spark-submit --packages com.databricks:spark-csv_2.10:1.4.0