Ответ 1
Вот пример кода, который делает это в обоих направлениях.
Я новичок в BigData. Мне нужно преобразовать файл csv/txt в формат Паркета. Я много искал, но не мог найти прямой способ сделать это. Есть ли способ достичь этого?
Вот пример кода, который делает это в обоих направлениях.
Вы можете использовать сверло Apache, как описано в разделе Преобразование CSV файла в Apache Parquet With Drill.
Вкратце:
Начать сверление Apache:
$ cd /opt/drill/bin $ sqlline -u jdbc:drill:zk=local
Создайте файл Parquet:
-- Set default table format to parquet ALTER SESSION SET 'store.format'='parquet'; -- Create a parquet table containing all data from the CSV table CREATE TABLE dfs.tmp.'/stats/airport_data/' AS SELECT CAST(SUBSTR(columns[0],1,4) AS INT) 'YEAR', CAST(SUBSTR(columns[0],5,2) AS INT) 'MONTH', columns[1] as 'AIRLINE', columns[2] as 'IATA_CODE', columns[3] as 'AIRLINE_2', columns[4] as 'IATA_CODE_2', columns[5] as 'GEO_SUMMARY', columns[6] as 'GEO_REGION', columns[7] as 'ACTIVITY_CODE', columns[8] as 'PRICE_CODE', columns[9] as 'TERMINAL', columns[10] as 'BOARDING_AREA', CAST(columns[11] AS DOUBLE) as 'PASSENGER_COUNT' FROM dfs.'/opendata/Passenger/SFO_Passenger_Data/*.csv';
Попробуйте выбрать данные из нового файла Parquet:
-- Select data from parquet table SELECT * FROM dfs.tmp.'/stats/airport_data/*'
Вы можете изменить местоположение dfs.tmp
, перейдя по http://localhost:8047/storage/dfs
(source: CSV и Parquet).
Я уже разместил ответ о том, как это сделать с помощью Apache Drill. Однако, если вы знакомы с Python, теперь вы можете сделать это, используя Pandas и PyArrow !
Использование pip
:
pip install pandas pyarrow
или используя conda
:
conda install pandas pyarrow -c conda-forge
# csv_to_parquet.py
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
csv_file = '/path/to/my.tsv'
parquet_file = '/path/to/my.parquet'
chunksize = 100_000
csv_stream = pd.read_csv(csv_file, sep='\t', chunksize=chunksize, low_memory=False)
for i, chunk in enumerate(csv_stream):
print("Chunk", i)
if i == 0:
# Guess the schema of the CSV file from the first chunk
parquet_schema = pa.Table.from_pandas(df=chunk).schema
# Open a Parquet file for writing
parquet_writer = pq.ParquetWriter(parquet_file, parquet_schema, compression='snappy')
# Write CSV chunk to the parquet file
table = pa.Table.from_pandas(chunk, schema=parquet_schema)
parquet_writer.write_table(table)
parquet_writer.close()
Я не сравнивал этот код с версией Apache Drill, но, по моему опыту, он достаточно быстрый, преобразуя десятки тысяч строк в секунду (это, конечно, зависит от файла CSV!).
Следующий пример является примером использования spark2.0. Чтение намного быстрее, чем опция inferSchema. Spark 2.0 конвертирует в паркетный файл гораздо эффективнее, чем spark1.6.
import org.apache.spark.sql.types._
var df = StructType(Array(StructField("timestamp", StringType, true),StructField("site", StringType, true),StructField("requests", LongType, true) ))
df = spark.read
.schema(df)
.option("header", "true")
.option("delimiter", "\t")
.csv("/user/hduser/wikipedia/pageviews-by-second-tsv")
df.write.parquet("/user/hduser/wikipedia/pageviews-by-second-parquet")
1) Вы можете создать таблицу внешнего улья
create external table emp(name string,job_title string,department string,salary_per_year int)
row format delimited
fields terminated by ','
location '.. hdfs location of csv file '
2) Еще одна таблица улей, в которой будет храниться файл паркета
create external table emp_par(name string,job_title string,department string,salary_per_year int)
row format delimited
stored as PARQUET
location 'hdfs location were you want the save parquet file'
Вставьте таблицу в одну таблицу в таблицу:
insert overwrite table emp_par select * from emp
Читайте csv файлы как Dataframe в Apache Spark с пакетом spark-csv. после загрузки данных в Dataframe сохранить файл данных в паркет файл.
val df = sqlContext.read
.format("com.databricks.spark.csv")
.option("header", "true")
.option("inferSchema", "true")
.option("mode", "DROPMALFORMED")
.load("/home/myuser/data/log/*.csv")
df.saveAsParquetFile("/home/myuser/data.parquet")
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.types import *
import sys
sc = SparkContext(appName="CSV2Parquet")
sqlContext = SQLContext(sc)
schema = StructType([
StructField("col1", StringType(), True),
StructField("col2", StringType(), True),
StructField("col3", StringType(), True),
StructField("col4", StringType(), True),
StructField("col5", StringType(), True)])
rdd = sc.textFile('/input.csv').map(lambda line: line.split(","))
df = sqlContext.createDataFrame(rdd, schema)
df.write.parquet('/output.parquet')