Ответ 1
Вы можете использовать limit(n)
.
sqlContext.format('com.databricks.spark.csv') \
.options(header='true', inferschema='true').load("file_path").limit(20)
Это просто загрузит 20 строк.
У меня есть большой распределенный файл на HDFS, и каждый раз, когда я использую sqlContext с пакетом spark-csv, он сначала загружает весь файл, который занимает довольно много времени.
df = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load("file_path")
теперь, поскольку я просто хочу выполнить некоторую быструю проверку в разы, все, что мне нужно, это немного/любые n строк всего файла.
df_n = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load("file_path").take(n)
df_n = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load("file_path").head(n)
но все они запускаются после загрузки файла. Не могу ли я просто ограничить количество строк при чтении самого файла? Я имею в виду эквивалент n_rows pandas в spark-csv, например:
pd_df = pandas.read_csv("file_path", nrows=20)
Или может случиться так, что искра фактически не загружает файл, первый шаг, но в этом случае, почему мой шаг загрузки файла занимает слишком много времени?
Я хочу
df.count()
чтобы дать мне только n
, а не все строки, возможно ли это?
Вы можете использовать limit(n)
.
sqlContext.format('com.databricks.spark.csv') \
.options(header='true', inferschema='true').load("file_path").limit(20)
Это просто загрузит 20 строк.
Я понимаю, что чтение нескольких строк не поддерживается модулем spark-csv напрямую, а в качестве обходного пути вы можете просто прочитать файл в виде текстового файла, взять столько строк, сколько вы хотите и сохраните его в некотором временном месте. С сохраненными строками вы можете использовать spark-csv для чтения строк, включая параметр inferSchema
(который вы можете использовать, если вы находитесь в режиме исследования).
val numberOfLines = ...
spark.
read.
text("myfile.csv").
limit(numberOfLines).
write.
text(s"myfile-$numberOfLines.csv")
val justFewLines = spark.
read.
option("inferSchema", true). // <-- you are in exploration mode, aren't you?
csv(s"myfile-$numberOfLines.csv")
Не выводя схему и используя limit(n)
, я работал во всех аспектах.
f_schema = StructType([
StructField("col1",LongType(),True),
StructField("col2",IntegerType(),True),
StructField("col3",DoubleType(),True)
...
])
df_n = sqlContext.read.format('com.databricks.spark.csv').options(header='true').schema(f_schema).load(data_path).limit(10)
Примечание.. Если мы используем inferschema='true'
, его снова одно и то же время, и, возможно, отсюда старая вещь.
Но если у нас есть идея схемы, решения Яцека Ласковского тоже хорошо работают.:)
Начиная с PySpark 2.3 вы можете просто загрузить данные в виде текста, ограничить и применить csv reader к результату:
(spark
.read
.options(inferSchema="true", header="true")
.csv(
spark.read.text("/path/to/file")
.limit(20) # Apply limit
.rdd.flatMap(lambda x: x))) # Convert to RDD[str]
Scala-аналог доступен с версии Spark 2.2:
spark
.read
.options(Map("inferSchema" -> "true", "header" -> "true"))
.csv(spark.read.text("/path/to/file").limit(20).as[String])
В Spark 3.0.0 или новее можно также применять функцию limit и использовать from_csv
, но для этого требуется схема, поэтому она, вероятно, не будет соответствовать вашим требованиям.