理解SparkSession

在上一节的程序中,我们在代码的开始,首先创建了一个SparkSession的实例spark。

在Spark 2.0中,SparkSession表示在Spark中操作数据的统一入口点。要创建一个基本的SparkSession,需要使用SparkSession.builder():

import org.apache.spark.sql.SparkSession

val spark = SparkSession
  .builder()
  .appName("Spark SQL basic example")
  .getOrCreate()

// 用于隐式转换,如将RDDs转换为DataFrames
import spark.implicits._

在Spark程序中,我们使用构建器设计模式实例化SparkSession对象。然而,在REPL环境中(即在Spark shell会话中),SparkSession会被自动创建,并通过名为spark的实例对象提供给我们使用。

SparkSession对象可以用来配置Spark的运行时配置属性。例如,Spark和Yarn管理的两个主要资源是CPU和内存。如果想为Spark executor设置内核数量和堆大小,那么可以通过分别设置spark.executor.cores和spark.executor.memory属性来实现这一点。例如,在下面的代码片段中,我们分别设置Spark executor运行时属性的核为2个,内存为4G。

spark.conf.set("spark.executor.cores", "2")
spark.conf.set("spark.executor.memory", "4g")

下面是在Spark SQL代码中创建SparkSession对象的常用代码模板:

import org.apache.spark.SparkSession
import org.apache.spark.SparkContext

val conf = SparkConf()
// conf.set("spark.app.name", application_name)
conf.set("spark.master", master)  		// master='yarn-client'
conf.set("spark.executor.cores", `num_cores`)
conf.set("spark.executor.instances", `num_executors`)
conf.set("spark.locality.wait", "0")
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");

val spark = SparkSession
.builder()
.appName(application_name)
.config(conf=conf).
getOrCreate()

可以使用SparkSession对象从各种源读取数据,例如CSV、JSON、JDBC、Stream等等。此外,它还可以用来执行sql语句、注册用户定义函数(UDF)和使用Dataset和DataFrame。

注:Spark 2.0中的SparkSession为Hive特性提供了内置支持,包括使用HiveQL编写查询、访问Hive UDF以及从Hive表中读取数据的能力。要使用这些特性,不需要已有的Hive安装。


《PySpark原理深入与编程实战》