Получить CSV для Spark dataframe
Я использую python на Spark и хотел бы получить csv в dataframe.
Документация для Spark SQL странно не дает объяснений CSV в качестве источника.
Я нашел Spark-CSV, однако у меня есть проблемы с двумя частями документации:
-
"This package can be added to Spark using the --jars command line option. For example, to include it when starting the spark shell: $ bin/spark-shell --packages com.databricks:spark-csv_2.10:1.0.3"
Должен ли я действительно добавлять этот аргумент каждый раз, когда я запускаю pyspark или spark-submit? Это кажется очень неэлегантным. Разве нет способа импортировать его в python, а не перегружать его каждый раз?
-
df = sqlContext.load(source="com.databricks.spark.csv", header="true", path = "cars.csv")
Даже если я сделаю это, это не сработает. Что означает аргумент "source" в этой строке кода? Как просто загрузить локальный файл в Linux, скажем "/Spark_Hadoop/spark-1.3.1-bin-cdh4/cars.csv"?
Ответы
Ответ 1
Прочтите файл csv в RDD, а затем сгенерируйте RowRDD из исходного RDD.
Создайте схему, представленную StructType, соответствующую структуре строк в RDD, созданной на шаге 1.
Применить схему к RDD строк с помощью метода createDataFrame, предоставленного SQLContext.
lines = sc.textFile("examples/src/main/resources/people.txt")
parts = lines.map(lambda l: l.split(","))
# Each line is converted to a tuple.
people = parts.map(lambda p: (p[0], p[1].strip()))
# The schema is encoded in a string.
schemaString = "name age"
fields = [StructField(field_name, StringType(), True) for field_name in schemaString.split()]
schema = StructType(fields)
# Apply the schema to the RDD.
schemaPeople = spark.createDataFrame(people, schema)
источник: СПРАВОЧНИК ПРОГРАММИРОВАНИЯ
Ответ 2
from pyspark.sql.types import StringType
from pyspark import SQLContext
sqlContext = SQLContext(sc)
Employee_rdd = sc.textFile("\..\Employee.csv")
.map(lambda line: line.split(","))
Employee_df = Employee_rdd.toDF(['Employee_ID','Employee_name'])
Employee_df.show()
Ответ 3
Если вы не против дополнительной зависимости пакета, вы можете использовать Pandas для разбора CSV файла. Он обрабатывает внутренние запятые просто отлично.
Зависимости:
from pyspark import SparkContext
from pyspark.sql import SQLContext
import pandas as pd
Прочитайте весь файл сразу в Spark DataFrame:
sc = SparkContext('local','example') # if using locally
sql_sc = SQLContext(sc)
pandas_df = pd.read_csv('file.csv') # assuming the file contains a header
# If no header:
# pandas_df = pd.read_csv('file.csv', names = ['column 1','column 2'])
s_df = sql_sc.createDataFrame(pandas_df)
Или, еще более сознательно, вы можете записать данные в Spark RDD, затем DF:
chunk_100k = pd.read_csv('file.csv', chunksize=100000)
for chunky in chunk_100k:
Spark_temp_rdd = sc.parallelize(chunky.values.tolist())
try:
Spark_full_rdd += Spark_temp_rdd
except NameError:
Spark_full_rdd = Spark_temp_rdd
del Spark_temp_rdd
Spark_DF = Spark_full_rdd.toDF(['column 1','column 2'])
Ответ 4
После Spark 2.0 рекомендуется использовать Spark Session:
from pyspark.sql import SparkSession
from pyspark.sql import Row
# Create a SparkSession
spark = SparkSession \
.builder \
.appName("basic example") \
.config("spark.some.config.option", "some-value") \
.getOrCreate()
def mapper(line):
fields = line.split(',')
return Row(ID=int(fields[0]), field1=str(fields[1].encode("utf-8")), field2=int(fields[2]), field3=int(fields[3]))
lines = spark.sparkContext.textFile("file.csv")
df = lines.map(mapper)
# Infer the schema, and register the DataFrame as a table.
schemaDf = spark.createDataFrame(df).cache()
schemaDf.createOrReplaceTempView("tablename")
Ответ 5
С более свежими версиями Spark (по-моему, 1.4) это стало намного проще. Выражение sqlContext.read
дает вам DataFrameReader
экземпляр с помощью метода .csv()
:
df = sqlContext.read.csv("/path/to/your.csv")
Обратите внимание, что вы также можете указать, что файл csv имеет заголовок, добавив к аргументу header=True
аргумент ключевого слова .csv()
. Доступно несколько других опций и описано в ссылке выше.
Ответ 6
для Pyspark, предполагая, что первая строка файла csv содержит заголовок
spark = SparkSession.builder.appName('chosenName').getOrCreate()
df=spark.read.csv('fileNameWithPath', mode="DROPMALFORMED",inferSchema=True, header = True)
Ответ 7
У меня возникла аналогичная проблема. Решение состоит в том, чтобы добавить переменную среды с именем "PYSPARK_SUBMIT_ARGS" и установить ее значение в "--packages com.databricks: spark-csv_2.10: 1.4.0 pyspark-shell". Это работает с интерактивной оболочкой Spark Python.
Убедитесь, что вы используете версию spark-csv с установленной версией Scala. При Scala 2.11 это искро-csv_2.11 и Scala 2.10 или 2.10.5 оно искро-csv_2.10.
Надеюсь, что это сработает.
Ответ 8
Основываясь на ответе Араванда, но гораздо короче, например.
lines = sc.textFile("/path/to/file").map(lambda x: x.split(","))
df = lines.toDF(["year", "month", "day", "count"])