Загрузить CSV файл с помощью Spark
Я новичок в Spark, и я пытаюсь читать CSV-данные из файла с помощью Spark.
Вот что я делаю:
sc.textFile('file.csv')
.map(lambda line: (line.split(',')[0], line.split(',')[1]))
.collect()
Я бы ожидал, что этот вызов даст мне список двух первых столбцов моего файла, но я получаю эту ошибку:
File "<ipython-input-60-73ea98550983>", line 1, in <lambda>
IndexError: list index out of range
хотя мой CSV файл содержит более одного столбца.
Ответы
Ответ 1
Вы уверены, что все строки имеют не менее 2 столбцов? Вы можете попробовать что-то вроде, просто чтобы проверить?:
sc.textFile("file.csv") \
.map(lambda line: line.split(",")) \
.filter(lambda line: len(line)>1) \
.map(lambda line: (line[0],line[1])) \
.collect()
В качестве альтернативы вы можете распечатать виновника (если есть):
sc.textFile("file.csv") \
.map(lambda line: line.split(",")) \
.filter(lambda line: len(line)<=1) \
.collect()
Ответ 2
Spark 2.0.0 +
Вы можете напрямую использовать встроенный источник данных csv:
spark.read.csv(
"some_input_file.csv", header=True, mode="DROPMALFORMED", schema=schema
)
или
(spark.read
.schema(schema)
.option("header", "true")
.option("mode", "DROPMALFORMED")
.csv("some_input_file.csv"))
без каких-либо внешних зависимостей.
Spark & lt; 2.0.0
Вместо ручного разбора, который в общем случае далек от тривиального, я бы рекомендовал spark-csv
:
Убедитесь, что Spark CSV включен в путь (--packages
, --jars
, --driver-class-path
)
И загрузите данные следующим образом:
(df = sqlContext
.read.format("com.databricks.spark.csv")
.option("header", "true")
.option("inferschema", "true")
.option("mode", "DROPMALFORMED")
.load("some_input_file.csv"))
Он может обрабатывать загрузку, вывод схемы, отбрасывать неверные строки и не требует передачи данных с Python в JVM.
Примечание
Если вы знаете схему, лучше избегать вывода схемы и передать ее на DataFrameReader
. Предполагая, что у вас есть три столбца - целое, двойное и строковое:
from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import DoubleType, IntegerType, StringType
schema = StructType([
StructField("A", IntegerType()),
StructField("B", DoubleType()),
StructField("C", StringType())
])
(sqlContext
.read
.format("com.databricks.spark.csv")
.schema(schema)
.option("header", "true")
.option("mode", "DROPMALFORMED")
.load("some_input_file.csv"))
Ответ 3
И еще одна опция, состоящая в чтении CSV файла с помощью Pandas, а затем импортировании Pandas DataFrame в Spark.
Например:
from pyspark import SparkContext
from pyspark.sql import SQLContext
import pandas as pd
sc = SparkContext('local','example') # if using locally
sql_sc = SQLContext(sc)
pandas_df = pd.read_csv('file.csv') # assuming the file contains a header
# pandas_df = pd.read_csv('file.csv', names = ['column 1','column 2']) # if no header
s_df = sql_sc.createDataFrame(pandas_df)
Ответ 4
from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.appName("Python Spark SQL basic example") \
.config("spark.some.config.option", "some-value") \
.getOrCreate()
df = spark.read.csv("/home/stp/test1.csv",header=True,sep="|");
print(df.collect())
Ответ 5
Простое разделение запятой также разделит запятые внутри полей (например, a,b,"1,2,3",c
), поэтому это не рекомендуется. Ответ 0323 хорош, если вы хотите использовать API DataFrames, но если вы хотите придерживаться базового Spark, вы можете проанализировать csvs в базовом Python с помощью модуля csv:
# works for both python 2 and 3
import csv
rdd = sc.textFile("file.csv")
rdd = rdd.mapPartitions(lambda x: csv.reader(x))
РЕДАКТИРОВАТЬ: Как упоминалось в комментариях @muon, заголовок будет обрабатываться как любая другая строка, поэтому вам придется извлечь его вручную. Например, header = rdd.first(); rdd = rdd.filter(lambda x: x != header)
header = rdd.first(); rdd = rdd.filter(lambda x: x != header)
(убедитесь, что header
не header = rdd.first(); rdd = rdd.filter(lambda x: x != header)
до оценки фильтра). Но в этот момент вам, вероятно, лучше использовать встроенный парсер csv.
Ответ 6
Теперь есть еще один вариант для любого общего файла csv: https://github.com/seahboonsiew/pyspark-csv следующим образом:
Предположим, что мы имеем следующий контекст
sc = SparkContext
sqlCtx = SQLContext or HiveContext
Сначала распределите pyspark-csv.py исполнителям, используя SparkContext
import pyspark_csv as pycsv
sc.addPyFile('pyspark_csv.py')
Чтение данных csv через SparkContext и преобразование его в DataFrame
plaintext_rdd = sc.textFile('hdfs://x.x.x.x/blah.csv')
dataframe = pycsv.csvToDataFrame(sqlCtx, plaintext_rdd)
Ответ 7
Это соответствует тому, что JP Mercier изначально предложил об использовании Pandas, но с серьезной модификацией: если вы читаете данные в Pandas в кусках, он должен быть более податливым. Смысл, что вы можете анализировать гораздо больший файл, чем Pandas может фактически обрабатывать как единый кусок и передавать его Spark в меньших размерах. (Это также отвечает на комментарий о том, почему нужно использовать Spark, если они могут загружать все в Pandas в любом случае.)
from pyspark import SparkContext
from pyspark.sql import SQLContext
import pandas as pd
sc = SparkContext('local','example') # if using locally
sql_sc = SQLContext(sc)
Spark_Full = sc.emptyRDD()
chunk_100k = pd.read_csv("Your_Data_File.csv", chunksize=100000)
# if you have headers in your csv file:
headers = list(pd.read_csv("Your_Data_File.csv", nrows=0).columns)
for chunky in chunk_100k:
Spark_Full += sc.parallelize(chunky.values.tolist())
YourSparkDataFrame = Spark_Full.toDF(headers)
# if you do not have headers, leave empty instead:
# YourSparkDataFrame = Spark_Full.toDF()
YourSparkDataFrame.show()
Ответ 8
Если вы хотите загрузить csv в качестве фрейма данных, вы можете сделать следующее:
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)
df = sqlContext.read.format('com.databricks.spark.csv') \
.options(header='true', inferschema='true') \
.load('sampleFile.csv') # this is your csv file
Это сработало для меня.
Ответ 9
Если ваши данные csv не содержат строк в любом из полей, вы можете загрузить свои данные с помощью textFile()
и проанализировать его
import csv
import StringIO
def loadRecord(line):
input = StringIO.StringIO(line)
reader = csv.DictReader(input, fieldnames=["name1", "name2"])
return reader.next()
input = sc.textFile(inputFile).map(loadRecord)
Ответ 10
Если в наборе данных есть одна или несколько строк с меньшим или большим числом столбцов, чем 2, то эта ошибка может возникнуть.
Я также новичок в Pyspark и пытаюсь прочитать файл CSV. Следующий код работал для меня:
В этом коде я использую набор данных из kaggle, ссылка: https://www.kaggle.com/carrie1/ecommerce-data
1. Без упоминания схемы:
from pyspark.sql import SparkSession
scSpark = SparkSession \
.builder \
.appName("Python Spark SQL basic example: Reading CSV file without mentioning schema") \
.config("spark.some.config.option", "some-value") \
.getOrCreate()
sdfData = scSpark.read.csv("data.csv", header=True, sep=",")
sdfData.show()
Теперь проверьте столбцы: sdfData.columns
Выход будет:
['InvoiceNo', 'StockCode','Description','Quantity', 'InvoiceDate', 'CustomerID', 'Country']
Проверьте тип данных для каждого столбца:
sdfData.schema
StructType(List(StructField(InvoiceNo,StringType,true),StructField(StockCode,StringType,true),StructField(Description,StringType,true),StructField(Quantity,StringType,true),StructField(InvoiceDate,StringType,true),StructField(UnitPrice,StringType,true),StructField(CustomerID,StringType,true),StructField(Country,StringType,true)))
Это даст фрейм данных со всеми столбцами с типом данных как StringType
2. Со схемой: если вы знаете схему или хотите изменить тип данных любого столбца в приведенной выше таблице, воспользуйтесь этим (допустим, у меня есть следующие столбцы, и я хочу, чтобы они имели определенный тип данных для каждого из них)
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import DoubleType, IntegerType, StringType
schema = StructType([\
StructField("InvoiceNo", IntegerType()),\
StructField("StockCode", StringType()), \
StructField("Description", StringType()),\
StructField("Quantity", IntegerType()),\
StructField("InvoiceDate", StringType()),\
StructField("CustomerID", DoubleType()),\
StructField("Country", StringType())\
])
scSpark = SparkSession \
.builder \
.appName("Python Spark SQL example: Reading CSV file with schema") \
.config("spark.some.config.option", "some-value") \
.getOrCreate()
sdfData = scSpark.read.csv("data.csv", header=True, sep=",", schema=schema)
Теперь проверьте схему для типа данных каждого столбца:
sdfData.schema
StructType(List(StructField(InvoiceNo,IntegerType,true),StructField(StockCode,StringType,true),StructField(Description,StringType,true),StructField(Quantity,IntegerType,true),StructField(InvoiceDate,StringType,true),StructField(CustomerID,DoubleType,true),StructField(Country,StringType,true)))
Отредактировано: Мы также можем использовать следующую строку кода без явного упоминания схемы:
sdfData = scSpark.read.csv("data.csv", header=True, inferSchema = True)
sdfData.schema
Выход:
StructType(List(StructField(InvoiceNo,StringType,true),StructField(StockCode,StringType,true),StructField(Description,StringType,true),StructField(Quantity,IntegerType,true),StructField(InvoiceDate,StringType,true),StructField(UnitPrice,DoubleType,true),StructField(CustomerID,IntegerType,true),StructField(Country,StringType,true)))
Вывод будет выглядеть так:
sdfData.show()
+---------+---------+--------------------+--------+--------------+----------+-------+
|InvoiceNo|StockCode| Description|Quantity| InvoiceDate|CustomerID|Country|
+---------+---------+--------------------+--------+--------------+----------+-------+
| 536365| 85123A|WHITE HANGING HEA...| 6|12/1/2010 8:26| 2.55| 17850|
| 536365| 71053| WHITE METAL LANTERN| 6|12/1/2010 8:26| 3.39| 17850|
| 536365| 84406B|CREAM CUPID HEART...| 8|12/1/2010 8:26| 2.75| 17850|
| 536365| 84029G|KNITTED UNION FLA...| 6|12/1/2010 8:26| 3.39| 17850|
| 536365| 84029E|RED WOOLLY HOTTIE...| 6|12/1/2010 8:26| 3.39| 17850|
| 536365| 22752|SET 7 BABUSHKA NE...| 2|12/1/2010 8:26| 7.65| 17850|
| 536365| 21730|GLASS STAR FROSTE...| 6|12/1/2010 8:26| 4.25| 17850|
| 536366| 22633|HAND WARMER UNION...| 6|12/1/2010 8:28| 1.85| 17850|
| 536366| 22632|HAND WARMER RED P...| 6|12/1/2010 8:28| 1.85| 17850|
| 536367| 84879|ASSORTED COLOUR B...| 32|12/1/2010 8:34| 1.69| 13047|
| 536367| 22745|POPPY PLAYHOUSE...| 6|12/1/2010 8:34| 2.1| 13047|
| 536367| 22748|POPPY PLAYHOUSE...| 6|12/1/2010 8:34| 2.1| 13047|
| 536367| 22749|FELTCRAFT PRINCES...| 8|12/1/2010 8:34| 3.75| 13047|
| 536367| 22310|IVORY KNITTED MUG...| 6|12/1/2010 8:34| 1.65| 13047|
| 536367| 84969|BOX OF 6 ASSORTED...| 6|12/1/2010 8:34| 4.25| 13047|
| 536367| 22623|BOX OF VINTAGE JI...| 3|12/1/2010 8:34| 4.95| 13047|
| 536367| 22622|BOX OF VINTAGE AL...| 2|12/1/2010 8:34| 9.95| 13047|
| 536367| 21754|HOME BUILDING BLO...| 3|12/1/2010 8:34| 5.95| 13047|
| 536367| 21755|LOVE BUILDING BLO...| 3|12/1/2010 8:34| 5.95| 13047|
| 536367| 21777|RECIPE BOX WITH M...| 4|12/1/2010 8:34| 7.95| 13047|
+---------+---------+--------------------+--------+--------------+----------+-------+
only showing top 20 rows
Ответ 11
Как правило, вы не пытаетесь разобрать CSV вручную. Здесь решение без зависимостей, которое будет обрабатывать любые escape-строки, например, строки с кавычками:
import csv # Python standard CSV library
def csv_to_rdd(csv_filename):
return sc.textFile(csv_filename) \
.map(lambda line: tuple(list(csv.reader([line]))[0]))
Ответ 12
При использовании spark.read.csv
я обнаружил, что использование параметров escape='"'
и multiLine=True
обеспечивает наиболее согласованное решение для стандарта CSV, и, по моему опыту, лучше всего работает с файлами CSV, экспортированными из Google Sheets.
То есть,
#set inferSchema=False to read everything as string
df = spark.read.csv("myData.csv", escape='"', multiLine=True,
inferSchema=False, header=True)
Ответ 13
import pandas as pd
data1 = pd.read_csv("test1.csv")
data2 = pd.read_csv("train1.csv")