理解SparkSession
在上一节的程序中,我们在代码的开始,首先创建了一个SparkSession的实例spark。
在Spark 2.0中,SparkSession表示在Spark中操作数据的统一入口点。要创建一个基本的SparkSession,需要使用SparkSession.builder():
import org.apache.spark.sql.SparkSession val spark = SparkSession .builder() .appName("Spark SQL basic example") .getOrCreate() // 用于隐式转换,如将RDDs转换为DataFrames import spark.implicits._
在Spark程序中,我们使用构建器设计模式实例化SparkSession对象。然而,在REPL环境中(即在Spark shell会话中),SparkSession会被自动创建,并通过名为spark的实例对象提供给我们使用。
SparkSession对象可以用来配置Spark的运行时配置属性。例如,Spark和Yarn管理的两个主要资源是CPU和内存。如果想为Spark executor设置内核数量和堆大小,那么可以通过分别设置spark.executor.cores和spark.executor.memory属性来实现这一点。例如,在下面的代码片段中,我们分别设置Spark executor运行时属性的核为2个,内存为4G。
spark.conf.set("spark.executor.cores", "2") spark.conf.set("spark.executor.memory", "4g")
下面是在Spark SQL代码中创建SparkSession对象的常用代码模板:
import org.apache.spark.SparkSession import org.apache.spark.SparkContext val conf = SparkConf() // conf.set("spark.app.name", application_name) conf.set("spark.master", master) // master='yarn-client' conf.set("spark.executor.cores", `num_cores`) conf.set("spark.executor.instances", `num_executors`) conf.set("spark.locality.wait", "0") conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); val spark = SparkSession .builder() .appName(application_name) .config(conf=conf). getOrCreate()
可以使用SparkSession对象从各种源读取数据,例如CSV、JSON、JDBC、Stream等等。此外,它还可以用来执行sql语句、注册用户定义函数(UDF)和使用Dataset和DataFrame。
注:Spark 2.0中的SparkSession为Hive特性提供了内置支持,包括使用HiveQL编写查询、访问Hive UDF以及从Hive表中读取数据的能力。要使用这些特性,不需要已有的Hive安装。