实验:使用spark-submit工具提交Spark作业

对于公司大数据的批量处理或周期性数据分析/处理任务,通常采用编写好的Spark程序,并通过Spark-submit指令的方式提交给Spark集群进行具体的任务计算,Spark-submit指令可以指定一些向集群申请资源的参数。

1、了解spark-submit指令的各种参数说明

在Linux环境下,可通过spark-submit --help 来了解spark-submit指令的各种参数说明

   $ cd ~/bigdata/spark-2.3.2
   $ ./bin/spark-submit --help    

spark-submit语法如下:

    $ ./bin/spark-submit [options] <lapp jar | python file> [app options]    

其中options的主要标志参数说明如下:

  • (1)--master:指定要连接到的集群管理器
  • (2)--deploy-mode:是否要在本地("client")启动驱动程序,或者在集群中("cluster")的一台worker机器上。在client模式下,spark-submit将在spark-submit被调用的机器上运行驱动程序。在cluster模式下,驱动程序会被发送到集群的一个worker节点上去执行。默认是client模式
  • (3)--class:应用程序的主类(带有main方法的类),如果运行Java或Scala程序
  • (4)--name:应用程序易读的名称,这将显示在Spark的web UI上
  • (5)--jars:一系列jar文件的列表,会被上传并设置到应用程序的classpath上。如果你的应用程序依赖于少量的第三方JAR包,可以将它们加到这里(逗号分隔)
  • (6)--files:一系列文件的列表,会被到应用程序的工作目录。这个标志参数可被用于想要分布到每个节点上的数据文件
  • (7)--py-files:一系列文件的列表,会被添加到应用程序的PYTHONPATH。这可以包括.py、.egg或.zip文件
  • (8)--executor-memory:executor使用的内存数量,以字节为单位。可以指定不同的后缀如"512m"或"15g"
  • (9)--driver-memory:driver进程所使用的内存数量,以字节为单位。可以指定不同的后缀如"512m"或"15g"

2、提交SparkPi程序(Scala语言编写),计算pi值

(1)打开终端窗口

(2)确保已经启动了Spark集群(standalone)模式(启动方式见上一实验)

(3)进入到Spark主目录下,执行以下操作:

   $ cd ~/bigdata/spark-2.4.5
   $ ./bin/spark-submit --master spark://master:7077 
                     --class org.apache.spark.examples.SparkPi 
                     examples/jars/spark-examples_2.11-2.4.5.jar    

说明:

  • --master参数指定要连接的集群管理器,这里是standalone模式
  • --calss参数指定要执行的主类名称(带包名的全限定名称)
  • 最后一个参数是所提交的.jar包

运行结果如下图所示:

......

3、提交pi.py程序(Python语言编写),计算pi值

(1)打开终端窗口

(2)确保已经启动了Spark集群(standalone)模式(启动方式见上一实验)

(3)进入到Spark主目录下,执行以下操作:

   $ cd ~/bigdata/spark-2.4.5
   $ ./bin/spark-submit --master spark://master:7077 examples/src/main/python/pi.py    

说明:

  • --master参数指定要连接的集群管理器,这里是standalone模式
  • 最后一个参数是所提交的python程序

运行结果如下图所示:

......

4、提交Spark程序到YARN集群上执行

(1)打开终端窗口

(2)不需要启动Spark集群。确保已经启动了Hadoop集群。

(3)进入到Spark主目录下,执行以下操作:

 $ ./bin/spark-submit \
   --class org.apache.spark.examples.SparkPi \
   --master yarn \
   --driver-memory 2G \
   ./examples/jars/spark-examples_2.11-2.4.5.jar   

有可能会出现异常:

......
20/05/26 14:12:55 ERROR spark.SparkContext: Error initializing SparkContext.
java.lang.IllegalStateException: Spark context stopped while waiting for backend
......
Caused by: java.nio.channels.ClosedChannelException
java.lang.IllegalStateException: Spark context stopped while waiting for backend
......    

这个问题的原因是由于Java 8的内存过量分配问题(虚拟内存不足),导致与YARN集群的关联可能会丢失。可以通过在yarn-site.xml中设置以下属性来强制YARN忽略它:

    <property>
          <name>yarn.nodemanager.pmem-check-enabled</name>
          <value>false</value>
    </property>

    <property>
           <name>yarn.nodemanager.vmem-check-enabled</name>
           <value>false</value>
    </property>    
  • yarn.nodemanager.pmem-check-enabled:是否启动一个线程检查每个任务正使用的物理内存量,如果任务超出分配值,则直接将其杀掉,默认是true。
  • yarn.nodemanager.vmem-check-enabled:是否启动一个线程检查每个任务正使用的虚拟内存量,如果任务超出分配值,则直接将其杀掉,默认是true。

也可以修改yarn-site.xml配置文件,设置虚拟内存与物理内存的比率的比例(默认为2.1,这里提高到4):

    <property>
        <name>yarn.nodemanager.vmem-pmem-ratio</name>
        <value>4</value>
        <description>在为容器设置内存限制时,虚拟内存与物理内存的比率</description>
    </property>

这个参数表示每单位的物理内存总量对应的虚拟内存量,默认是2.1,表示每使用1MB的物理内存,最多可以使用2.1MB的虚拟内存总量。

然后再执行,异常消失,程序正常执行,如下图所示:


《Flink原理深入与编程实战》