Преобразование Pandas данных в ошибку Spark dataframe
Я пытаюсь преобразовать Pandas DF в Spark.
DF head:
10000001,1,0,1,12:35,OK,10002,1,0,9,f,NA,24,24,0,3,9,0,0,1,1,0,0,4,543
10000001,2,0,1,12:36,OK,10002,1,0,9,f,NA,24,24,0,3,9,2,1,1,3,1,3,2,611
10000002,1,0,4,12:19,PA,10003,1,1,7,f,NA,74,74,0,2,15,2,0,2,3,1,2,2,691
код:
dataset = pd.read_csv("data/AS/test_v2.csv")
sc = SparkContext(conf=conf)
sqlCtx = SQLContext(sc)
sdf = sqlCtx.createDataFrame(dataset)
И у меня есть ошибка:
TypeError: Can not merge type <class 'pyspark.sql.types.StringType'> and <class 'pyspark.sql.types.DoubleType'>
Ответы
Ответ 1
Вам нужно убедиться, что ваши столбцы данных pandas подходят для типа искры. Если ваш фреймворк pandas перечисляет что-то вроде:
pd.info()
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 5062 entries, 0 to 5061
Data columns (total 51 columns):
SomeCol 5062 non-null object
Col2 5062 non-null object
И вы получите эту ошибку:
df[['SomeCol', 'Col2']] = df[['SomeCol', 'Col2']].astype(str)
Теперь убедитесь, что .astype(str)
- это фактически тот тип, которым вы хотите, чтобы эти столбцы были. В основном, когда базовый код Java пытается вывести тип из объекта в python, он использует некоторые наблюдения и делает предположение, если это предположение не распространяется на все данные в столбцах (столбцах), которые он пытается преобразовать из pandas, чтобы исправить это не удастся.
Ответ 2
Ошибок, связанных с типом, можно избежать путем наложения схемы следующим образом:
примечание: создан текстовый файл (test.csv) с исходными данными (как указано выше) и вставлены гипотетические имена столбцов ("col1", "col2",..., "col25").
import pyspark
from pyspark.sql import SparkSession
import pandas as pd
spark = SparkSession.builder.appName('pandasToSparkDF').getOrCreate()
pdDF = pd.read_csv("test.csv")
содержимое фрейма данных панд:
pdDF
col1 col2 col3 col4 col5 col6 col7 col8 col9 col10 ... col16 col17 col18 col19 col20 col21 col22 col23 col24 col25
0 10000001 1 0 1 12:35 OK 10002 1 0 9 ... 3 9 0 0 1 1 0 0 4 543
1 10000001 2 0 1 12:36 OK 10002 1 0 9 ... 3 9 2 1 1 3 1 3 2 611
2 10000002 1 0 4 12:19 PA 10003 1 1 7 ... 2 15 2 0 2 3 1 2 2 691
Далее создайте схему:
from pyspark.sql.types import *
mySchema = StructType([ StructField("Col1", LongType(), True)\
,StructField("Col2", IntegerType(), True)\
,StructField("Col3", IntegerType(), True)\
,StructField("Col4", IntegerType(), True)\
,StructField("Col5", StringType(), True)\
,StructField("Col6", StringType(), True)\
,StructField("Col7", IntegerType(), True)\
,StructField("Col8", IntegerType(), True)\
,StructField("Col9", IntegerType(), True)\
,StructField("Col10", IntegerType(), True)\
,StructField("Col11", StringType(), True)\
,StructField("Col12", StringType(), True)\
,StructField("Col13", IntegerType(), True)\
,StructField("Col14", IntegerType(), True)\
,StructField("Col15", IntegerType(), True)\
,StructField("Col16", IntegerType(), True)\
,StructField("Col17", IntegerType(), True)\
,StructField("Col18", IntegerType(), True)\
,StructField("Col19", IntegerType(), True)\
,StructField("Col20", IntegerType(), True)\
,StructField("Col21", IntegerType(), True)\
,StructField("Col22", IntegerType(), True)\
,StructField("Col23", IntegerType(), True)\
,StructField("Col24", IntegerType(), True)\
,StructField("Col25", IntegerType(), True)])
Примечание: True
(подразумевается допускаемое значение nullable)
создайте фрейм данных pyspark:
df = spark.createDataFrame(pdDF,schema=mySchema)
подтвердите, что фрейм данных pandas теперь является фреймом данных pyspark:
type(df)
выход:
pyspark.sql.dataframe.DataFrame
В сторону:
Чтобы ответить на комментарий Кейт ниже - для наложения общей (строковой) схемы вы можете сделать следующее:
df=spark.createDataFrame(pdDF.astype(str))
Ответ 3
Я пробовал это с вашими данными, и он работает:
%pyspark
import pandas as pd
from pyspark.sql import SQLContext
print sc
df = pd.read_csv("test.csv")
print type(df)
print df
sqlCtx = SQLContext(sc)
sqlCtx.createDataFrame(df).show()
Ответ 4
Я получил похожее сообщение об ошибке один раз, в моем случае это произошло потому, что мой информационный фрейм pandas содержал NULL. Я рекомендую попробовать и обработать это в пандах перед тем, как перейти к искре (это решило проблему в моем случае).
Ответ 5
Я сделал этот алгоритм, он работал для моих 10 кадров данных панд
from pyspark.sql.types import *
# Auxiliar functions
def equivalent_type(f):
if f == 'datetime64[ns]': return DateType()
elif f == 'int64': return LongType()
elif f == 'int32': return IntegerType()
elif f == 'float64': return FloatType()
else: return StringType()
def define_structure(string, format_type):
try: typo = equivalent_type(format_type)
except: typo = StringType()
return StructField(string, typo)
# Given pandas dataframe, it will return a spark dataframe.
def pandas_to_spark(pandas_df):
columns = list(pandas_df.columns)
types = list(pandas_df.dtypes)
struct_list = []
i = 0
for column, typo in zip(columns, types):
struct_list.append(define_structure(column, typo))
p_schema = StructType(struct_list)
return sqlContext.createDataFrame(pandas_df, p_schema)
Вы можете увидеть это также в этой сути
Для этого вам просто нужно вызвать spark_df = pandas_to_spark(pandas_df)