Ответ 1
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('abc').getOrCreate()
Теперь для импорта некоторых CSV файлов вы можете использовать
df=spark.read.csv('filename.csv',header=True)
Я только получил доступ к искру 2.0; Я использовал искру 1.6.1 до этого момента. Может кто-то, пожалуйста, помогите мне настроить sparkSession, используя pyspark (python)? Я знаю, что примеры scala, доступные в Интернете, похожи (здесь), но я надеялся на прямое прохождение на языке python.
Мой конкретный случай: я загружаю файлы avro из S3 в блокноте zeppelin. Затем строит df и запускает из них различные pyspark и sql-запросы. Все мои старые запросы используют sqlContext. Я знаю, что это плохая практика, но я начал свой ноутбук с помощью
sqlContext = SparkSession.builder.enableHiveSupport().getOrCreate()
.
Я могу читать в avros с помощью
mydata = sqlContext.read.format("com.databricks.spark.avro").load("s3:...
и создавать фреймы данных без проблем. Но как только я начинаю запрашивать таблицы dataframes/temp, я продолжаю получать ошибку "java.lang.NullPointerException". Я думаю, что это свидетельствует о трансляционной ошибке (например, старые запросы, работающие в 1.6.1, но нуждаются в настройке для 2.0). Ошибка возникает независимо от типа запроса. Поэтому я предполагаю
1.) псевдоним sqlContext - плохая идея
и
2.) Мне нужно правильно настроить sparkSession.
Итак, если кто-то может показать мне, как это делается, или, возможно, объяснить несоответствия, которые они знают между разными версиями искры, я бы очень признателен. Пожалуйста, дайте мне знать, если мне нужно уточнить этот вопрос. Прошу прощения, если он запутан.
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('abc').getOrCreate()
Теперь для импорта некоторых CSV файлов вы можете использовать
df=spark.read.csv('filename.csv',header=True)
Отсюда http://spark.apache.org/docs/2.0.0/api/python/pyspark.sql.html
Вы можете создать сеанс зажигания, используя это:
>>> from pyspark.conf import SparkConf
>>> SparkSession.builder.config(conf=SparkConf())
Как вы можете видеть в примере с scala, Spark Session является частью модуля sql. Похоже на питона. следовательно, смотрите документацию модуля pyspark sql
class pyspark.sql.SparkSession(sparkContext, jsparkSession = None) Точка входа в программирование Spark с помощью API набора данных и DataFrame. SparkSession может использоваться для создания DataFrame, регистрации DataFrame в виде таблиц, выполнения SQL над таблицами, кэширования таблиц и чтения файлов паркета. Чтобы создать SparkSession, используйте следующий шаблон компоновщика:
>>> spark = SparkSession.builder \
... .master("local") \
... .appName("Word Count") \
... .config("spark.some.config.option", "some-value") \
... .getOrCreate()
spark = SparkSession.builder\
.master("local")\
.enableHiveSupport()\
.getOrCreate()
spark.conf.set("spark.executor.memory", '8g')
spark.conf.set('spark.executor.cores', '3')
spark.conf.set('spark.cores.max', '3')
spark.conf.set("spark.driver.memory",'8g')
sc = spark.sparkContext
Вот полезный класс Python SparkSession, который я разработал:
#!/bin/python
# -*- coding: utf-8 -*-
######################
# SparkSession class #
######################
class SparkSession:
# - Notes:
# The main object if Spark Context ('sc' object).
# All new Spark sessions ('spark' objects) are sharing the same underlying Spark context ('sc' object) into the same JVM,
# but for each Spark context the temporary tables and registered functions are isolated.
# You can't create a new Spark Context into another JVM by using 'sc = SparkContext(conf)',
# but it possible to create several Spark Contexts into the same JVM by specifying 'spark.driver.allowMultipleContexts' to true (not recommended).
# - See:
# https://medium.com/@achilleus/spark-session-10d0d66d1d24
# https://stackoverflow.com/info/47723761/how-many-sparksessions-can-a-single-application-have
# https://stackoverflow.com/info/34879414/multiple-sparkcontext-detected-in-the-same-jvm
# https://stackoverflow.com/info/39780792/how-to-build-a-sparksession-in-spark-2-0-using-pyspark
# https://stackoverflow.com/info/47813646/sparkcontext-getorcreate-purpose?noredirect=1&lq=1
from pyspark.sql import SparkSession
spark = None # The Spark Session
sc = None # The Spark Context
scConf = None # The Spark Context conf
def _init(self):
self.sc = self.spark.sparkContext
self.scConf = self.sc.getConf() # or self.scConf = self.spark.sparkContext._conf
# Return the current Spark Session (singleton), otherwise create a new oneÒ
def getOrCreateSparkSession(self, master=None, appName=None, config=None, enableHiveSupport=False):
cmd = "self.SparkSession.builder"
if (master != None): cmd += ".master(" + master + ")"
if (appName != None): cmd += ".appName(" + appName + ")"
if (config != None): cmd += ".config(" + config + ")"
if (enableHiveSupport == True): cmd += ".enableHiveSupport()"
cmd += ".getOrCreate()"
self.spark = eval(cmd)
self._init()
return self.spark
# Return the current Spark Context (singleton), otherwise create a new one via getOrCreateSparkSession()
def getOrCreateSparkContext(self, master=None, appName=None, config=None, enableHiveSupport=False):
self.getOrCreateSparkSession(master, appName, config, enableHiveSupport)
return self.sc
# Create a new Spark session from the current Spark session (with isolated SQL configurations).
# The new Spark session is sharing the underlying SparkContext and cached data,
# but the temporary tables and registered functions are isolated.
def createNewSparkSession(self, currentSparkSession):
self.spark = currentSparkSession.newSession()
self._init()
return self.spark
def getSparkSession(self):
return self.spark
def getSparkSessionConf(self):
return self.spark.conf
def getSparkContext(self):
return self.sc
def getSparkContextConf(self):
return self.scConf
def getSparkContextConfAll(self):
return self.scConf.getAll()
def setSparkContextConfAll(self, properties):
# Properties example: { 'spark.executor.memory' : '4g', 'spark.app.name' : 'Spark Updated Conf', 'spark.executor.cores': '4', 'spark.cores.max': '4'}
self.scConf = self.scConf.setAll(properties) # or self.scConf = self.spark.sparkContext._conf.setAll()
# Stop (clears) the active SparkSession for current thread.
#def stopSparkSession(self):
# return self.spark.clearActiveSession()
# Stop the underlying SparkContext.
def stopSparkContext(self):
self.spark.stop() # Or self.sc.stop()
# Returns the active SparkSession for the current thread, returned by the builder.
#def getActiveSparkSession(self):
# return self.spark.getActiveSession()
# Returns the default SparkSession that is returned by the builder.
#def getDefaultSession(self):
# return self.spark.getDefaultSession()