I just got access to spark 2.0; I have been using spark 1.6.1 up until this point. Can someone please help me set up a sparkSession using pyspark (python)? I know that the scala examples available online are similar (here), but I was hoping for a direct walkthrough in python language.

My specific case: I am loading in avro files from S3 in a zeppelin spark notebook. Then building df's and running various pyspark & sql queries off of them. All of my old queries use sqlContext. I know this is poor practice, but I started my notebook with

sqlContext = SparkSession.builder.enableHiveSupport().getOrCreate().

I can read in the avros with

mydata = sqlContext.read.format("com.databricks.spark.avro").load("s3:...

and build dataframes with no issues. But once I start querying the dataframes/temp tables, I keep getting the "java.lang.NullPointerException" error. I think that is indicative of a translational error (e.g. old queries worked in 1.6.1 but need to be tweaked for 2.0). The error occurs regardless of query type. So I am assuming

1.) the sqlContext alias is a bad idea

and

2.) I need to properly set up a sparkSession.

So if someone could show me how this is done, or perhaps explain the discrepancies they know of between the different versions of spark, I would greatly appreciate it. Please let me know if I need to elaborate on this question. I apologize if it is convoluted.

5

Best Answer


from pyspark.sql import SparkSessionspark = SparkSession.builder.appName('abc').getOrCreate()

now to import some .csv file you can use

df=spark.read.csv('filename.csv',header=True)

As you can see in the scala example, Spark Session is part of sql module. Similar in python. hence, see pyspark sql module documentation

class pyspark.sql.SparkSession(sparkContext, jsparkSession=None) Theentry point to programming Spark with the Dataset and DataFrame API. ASparkSession can be used create DataFrame, register DataFrame astables, execute SQL over tables, cache tables, and read parquet files.To create a SparkSession, use the following builder pattern:

>>> spark = SparkSession.builder \... .master("local") \... .appName("Word Count") \... .config("spark.some.config.option", "some-value") \... .getOrCreate()

From here http://spark.apache.org/docs/2.0.0/api/python/pyspark.sql.html
You can create a spark session using this:

>>> from pyspark.sql import SparkSession>>> from pyspark.conf import SparkConf>>> c = SparkConf()>>> SparkSession.builder.config(conf=c)
spark = SparkSession.builder\.master("local")\.enableHiveSupport()\.getOrCreate()spark.conf.set("spark.executor.memory", '8g')spark.conf.set('spark.executor.cores', '3')spark.conf.set('spark.cores.max', '3')spark.conf.set("spark.driver.memory",'8g')sc = spark.sparkContext

Here's a useful Python SparkSession class I developed:

#!/bin/python# -*- coding: utf-8 -*-####################### SparkSession class #######################class SparkSession:# - Notes:# The main object if Spark Context ('sc' object).# All new Spark sessions ('spark' objects) are sharing the same underlying Spark context ('sc' object) into the same JVM,# but for each Spark context the temporary tables and registered functions are isolated.# You can't create a new Spark Context into another JVM by using 'sc = SparkContext(conf)',# but it's possible to create several Spark Contexts into the same JVM by specifying 'spark.driver.allowMultipleContexts' to true (not recommended).# - See:# https://medium.com/@achilleus/spark-session-10d0d66d1d24# https://stackoverflow.com/questions/47723761/how-many-sparksessions-can-a-single-application-have# https://stackoverflow.com/questions/34879414/multiple-sparkcontext-detected-in-the-same-jvm# https://stackoverflow.com/questions/39780792/how-to-build-a-sparksession-in-spark-2-0-using-pyspark# https://stackoverflow.com/questions/47813646/sparkcontext-getorcreate-purpose?noredirect=1&lq=1from pyspark.sql import SparkSessionspark = None # The Spark Sessionsc = None # The Spark ContextscConf = None # The Spark Context confdef _init(self):self.sc = self.spark.sparkContextself.scConf = self.sc.getConf() # or self.scConf = self.spark.sparkContext._conf# Return the current Spark Session (singleton), otherwise create a new oneÒdef getOrCreateSparkSession(self, master=None, appName=None, config=None, enableHiveSupport=False):cmd = "self.SparkSession.builder"if (master != None): cmd += ".master(" + master + ")"if (appName != None): cmd += ".appName(" + appName + ")"if (config != None): cmd += ".config(" + config + ")"if (enableHiveSupport == True): cmd += ".enableHiveSupport()"cmd += ".getOrCreate()"self.spark = eval(cmd)self._init()return self.spark# Return the current Spark Context (singleton), otherwise create a new one via getOrCreateSparkSession()def getOrCreateSparkContext(self, master=None, appName=None, config=None, enableHiveSupport=False):self.getOrCreateSparkSession(master, appName, config, enableHiveSupport)return self.sc # Create a new Spark session from the current Spark session (with isolated SQL configurations).# The new Spark session is sharing the underlying SparkContext and cached data,# but the temporary tables and registered functions are isolated.def createNewSparkSession(self, currentSparkSession):self.spark = currentSparkSession.newSession()self._init()return self.sparkdef getSparkSession(self):return self.sparkdef getSparkSessionConf(self):return self.spark.confdef getSparkContext(self):return self.scdef getSparkContextConf(self):return self.scConfdef getSparkContextConfAll(self):return self.scConf.getAll()def setSparkContextConfAll(self, properties):# Properties example: { 'spark.executor.memory' : '4g', 'spark.app.name' : 'Spark Updated Conf', 'spark.executor.cores': '4', 'spark.cores.max': '4'}self.scConf = self.scConf.setAll(properties) # or self.scConf = self.spark.sparkContext._conf.setAll()# Stop (clears) the active SparkSession for current thread.#def stopSparkSession(self):# return self.spark.clearActiveSession()# Stop the underlying SparkContext.def stopSparkContext(self):self.spark.stop() # Or self.sc.stop()# Returns the active SparkSession for the current thread, returned by the builder.#def getActiveSparkSession(self):# return self.spark.getActiveSession()# Returns the default SparkSession that is returned by the builder.#def getDefaultSession(self):# return self.spark.getDefaultSession()