通过Spark Web UI理解Spark程序执行
Apache Spark提供了一套Web UI/用户界面(Jobs, Stages, Tasks, Storage, Environment, Executors, and SQL)来监控Spark/PySpark应用程序的状态、Spark集群的资源消耗和Spark配置。
为了更好地理解Spark是如何执行Spark/PySpark作业的,这组用户界面会派上用场。在本教程中,我们将运行一个小应用程序,并通过使用Spark Web UI中的不同部分来解释Spark是如何执行这个应用程序的。
在进入Spark UI之前,首先了解以下两个概念:
- Transformation
- Action
Spark 应用程序代码是一组指令,它指示驱动程序执行一个Spark作业,并让驱动程序决定如何在executors的帮助下完成该作业。
对驱动程序的指令称为 Transformations(转换),而Action(操作)将触发执行。
下面是一个小的Spark应用程序,可以进行transformation和action。
// 数据源文件
val filePath = "file:///home/hduser/data/spark/resources/people.csv"
// Transformation
val df = spark.read // job(0):read
.option("inferSchema", "true") // job(1):inferSchema
.option("header", "true")
.option("sep", ";")
.csv(filePath)
// Action
df.count() // job(2):get count
上面的代码通过读取一个.csv文件来创建一个DataFrame,并检查DataFrame的计数。接下来让我们来了解一下应用程序是如何在Spark UI中进行投影的。
Spark UI包含以下选项卡。
- Spark Jobs
- Stages
- Tasks
- Storage
- Environment
- Executors
- SQL
Spark UI默认运行在端口4040上,可以使用http://IP地址:4040/访问Spark UI。
下面是一些额外的UI,可以帮助跟踪Spark应用程序:
- Spark应用程序UI:http://localhost:4040/
- 资源管理器:http://localhost:9870
- Spark JobTracker:http://localhost:8088/
- 节点详细信息:http://localhost:8042/
注意:要访问这些URL, Spark应用程序必须处于运行状态。如果想要在任何Spark应用程序状态下都可以访问这个URL,需要启动Spark历史服务器。
1、Spark Jobs 选项卡
在Jobs选项卡下了解的详细信息有调度模式、Spark作业的数量、它具有的stages数量和Spark作业中的描述。
1)Spark作业数量
Spark作业的数量等于应用程序中的action数量,每个Spark作业应该至少有一个Stage。
在上面的应用程序中,我们执行了3个Spark作业(0,1,2):
- Job 0:读取CSV文件。
- Job 1:从文件中推断模式(inferSchema)。
- Job 2:count统计。
因此,如果我们看一下图,它清楚地显示了3个action的3个Spark作业结果。
2)Stages的数量
每个宽转换产生一个单独的Stages数量。在我们的例子中,Spark job0和Spark job1有单独的单个Stage,但是当涉及到Spark job 3时,我们可以看到两个Stages,因为数据分区。默认情况下,数据被划分为两个文件。
3)Description
Description链接了相关Spark Job的完整详细信息,如Spark Job Status、DAG Visualization、Completed Stages等。如下图所示:
2、Stages 选项卡
我们可以通过两种方式导航到Stage Tab选项卡。
(1)选择相应Spark作业的Description描述链接(仅显示所选Spark作业的Stages阶段)。
(2)在Spark Job选项卡的顶部选择Stages选项(显示应用程序中的所有阶段)。
在我们的应用程序中,我们总共有4个Stages。
Stage选项卡显示了一个摘要页面,其中显示了Spark应用程序中所有Spark作业的所有stages(阶段)的当前状态
每个stage(阶段)中可以看到的任务数量是spark将要处理的分区数量,每个stage(阶段)中的每个任务都是由spark完成的相同工作,但在不同的数据分区上。
Stage详细信息
Stage(阶段)的细节展示了该阶段的有向无环图(DAG),其中顶点表示RDD或DataFrame,边表示要应用的操作。
让我们分阶段分析操作。Stage0中的操作是:
- (1)FileScanRDD
- (2)MapPartitionsRDD
(1)FileScanRDD
FileScan表示从文件中读取数据。在我们的场景中,读取CSV文件。
给出的FilePartitions是带有PartitionedFiles(文件块)的自定义RDD分区。
(2)MapPartitionsRDD
MapPartitionsRDD将在使用map分区转换时创建。
Stage(1)中的操作是:
- (1)FileScanRDD
- (2)MapPartitionsRDD
- (3)SQLExecutionRDD
因为已经解释了FileScanRDD和MapPartitionsRDD,接下来让我们看看SQLExecutionRDD。
(3)SQLExecutionRDD
SQLExecutionRDD是一个Spark属性,用于跟踪多个Spark作业,这些作业应该一起构成一个结构化的查询执行。
在Stage(2)和Stage(3)中的操作是:
- (1)FileScanRDD
- (2)MapPartitionsRDD
- (4)WholeStageCodegen
- (5)Exchange
(4)WholeStageCodegen
Spark SQL中的一个物理查询优化器,融合了多个物理操作符。
(5)Exchange
由于count方法而执行Exchange(交换)。
由于数据被划分为多个分区并在执行器(executor)之间共享,因此要获得count,应该将来自各个分区的count相加。
跨集群(Executor)的数据移动,称为shuffle。这是最昂贵的操作,如果分区数量越多,执行器之间的数据交换也会越多。
Tasks
Tasks位于各自stage(阶段)的底部空间。
查看任务页面的关键内容有:
- Input Size – Stage的输入。
- Shuffle Write Size - Stage的写输出。
3、Storage选项卡页面
Storage选项卡显示应用程序中持久化的RDD和DataFrame(如果有的话)。摘要页面(summary)显示所有RDD的存储级别、大小和分区,详细信息页面(details)显示RDD或DataFrame中所有分区的大小和使用的执行器(executor)。
4、Environment选项卡页面
Environment选项卡显示不同环境和配置变量的值,包括JVM、Spark和系统属性。这是检查属性设置是否正确的有用地方。
这个环境页面有六个部分:
- Runtime Information:只是包含运行时属性,如Java和Scala的版本。
- Spark Properties:列出应用程序的属性,如'spark.app.name'和'spark.driver.memory'。
- Resource Profiles:列出所使用的资源轮廓。
- Hadoop Properties:显示与Hadoop和YARN相关的属性。注意:像“spark.hadoop”这样的属性不是在这一部分,而是在Spark Properties中显示。
- System Properties:显示有关JVM的更多细节。
- Classpath Entries:列出从不同源加载的类,这对于解决类冲突非常有用。
5、Executors 选项卡页面
Executors选项卡显示了为应用程序创建的执行器的摘要信息,包括内存和磁盘使用情况以及任务和随机信息。“Storage Memory”列显示用于缓存数据的已使用和预留的内存量。
Executors选项卡不仅提供每个执行器使用的内存量、磁盘和内核等资源信息,还提供性能信息。
在执行器中:
- 核数(Cores)为2。因为使用默认系统核数量。
- 任务数(Total Tasks)为4。
6、SQL 选项卡页面
如果应用程序执行Spark SQL查询,那么SQL选项卡将显示信息,例如查询的持续时间、Spark作业以及物理和逻辑计划。
在我们的应用程序中,我们对文件和DataFrame执行读取和计数操作。因此,读取和计数都列在SQL选项卡中。