使用IntelliJ IDEA开发Spark SBT应用程序

要开发Spark应用程序,业界普遍采用IntelliJ IDEA这样的集成开发环境。在使用集成开发环境工具开发Spark应用程序时,需要依赖很多Hadoop和Sparkt等的依赖库,目前企业中普遍采用一些项目管理和构建工具(如Maven、SBT等)来管理依赖,以及项目的编译和打包等。Spark官方推荐的是使用SBT(Scala Build Tool,Scala构建工具)来构建Spark项目、管理依赖、编译和打包项目。

本节主要内容包括:

IntelliJ IDEA,一般简称IDEA,是Java语言开发的集成环境。IntelliJ在业界被公认为最好的Java开发工具之一,尤其在智能代码助手、代码自动提示、重构、J2EE支持、Ant、JUnit、CVS整合、代码审查、创新的GUI设计等方面的功能可以说是超常的。IDEA是JetBrains公司的产品,这家公司总部位于捷克共和国的首都布拉格,开发人员以严谨著称的东欧程序员为主。

下载地址:https://www.jetbrains.com/idea/download/#section=windows

IDEA每个版本提供Community和Ultimate两个版本,如下图所示,其中Community是完全免费的,而Ultimate版本可以使用30天,过这段时间后需要收费。开发Spark应用程序,下载一个最新的Community版本即可。


一、安装IntelliJ IDEA

1.首先在官网下载IDEA https://www.jetbrains.com/idea/download/#section=windows。

双击exe文件,安装。

2.傻瓜式安装,一路Next

接着Next,选择安装路径。路径要记牢哦,后面会用到。

3.耐心等待安装

OK,到这里就安装完成了。

安装完成后,我们就可以启动IntelliJ IDEA了。可以通过两种方式启动:

  • 到IntelliJ IDEA安装所在目录下,进入bin目录双击idea.sh启动IntelliJ IDEA;
  • 在命令行终端中,进入$IDEA_HOME/bin目录,输入./idea.sh进行启动。


二、配置IntelliJ IDEA Scala环境

在IDEA中开发scala程序(以及spark程序)需要安装scala插件。IDEA默认情况下并没有安装Scala插件,需要手动进行安装。安装过程并不复杂,下面将演示如何进行安装。

1)在IDEA启动界面上选择“Configure-->Plugins”选项(或者在项目界面,选择“File > Settings... > Plugins”),然后弹出插件管理界面,在该界面上列出了所有安装好的插件。由于Scala插件没有安装,需要点击“Install JetBrains plugins”进行安装,如下图所示:

 
 

2)待安装的插件很多,可以通过查询或者字母顺序找到Scala插件,选择插件后在界面的右侧出现该插件的详细信息,点击绿色按钮“Install plugin”安装插件,如下图所示:

 

3)安装过程将出现安装进度界面,通过该界面了解插件安装进度,如下图所示:

 4)安装完成后,将看到一个按钮,用于重新启动IntelliJ IDE。继续并点击它来重启我们的IntelliJ:

5)最后,重启IntelliJ,让插件生效。

现在我们已经安装了IntelliJ IDEA、Scala插件和SBT,可以开始构建Spark程序了。


三、创建IntelliJ SBT项目

SBT 之于 Scala 就像 Maven 之于 Java,用于管理项目依赖,构建项目。使用IntelliJ IDEA开发Spark应用程序,使用SBT作为构建管理器,这是也官方推荐的开发方式(安装Scala插件时,该Scala插件自带SBT工具)。

请按以下步骤,使用IntelliJ IDEA创建一个新的Spark SBT项目。

1)启动IntelliJ IDEA,在开始界面中,选择【create new project】,创建一个新项目。如下图所示:

2)接下来,依次选择【Scala】|【sbt】,然后单击【Next】按钮。如下图所示:

3)在接下来的向导窗口中,将项目命名为“HelloSpark”,指定项目存放的位置,并选择合适的sbt和Scala版本。如下图所示:

4)单击【Finish】按钮继续。IntelliJ应该创建一个具有默认目录结构的新项目。生成所需的所有文件夹可能需要一到两分钟,最终的文件夹结构应该是这样的:

让我们了解一下生成的项目结构,说明如下:

  • .idea:这些是IntelliJ配置文件。
  • project:编译期间使用的文件。例如,在build.properties中指定编译项目时使用的SBT版本。
  • src:源代码。大多数代码应该放在main/scala目录下。测试脚本放在test/scala文件夹下。
  • target:当编译项目时,会生成这个文件夹。
  • build.sbt:sbt配置文件。使用该文件导入第三方库和文档。


四、配置SBT构建文件

在开始编写Spark应用程序之前,我们需要将Spark库和文档导入IntelliJ,这需要在build.sbt文件中进行配置。编辑build.sbt文件,创建工程时生成的初始内容如下:

name := "HelloSpark"

version := "1.0"

scalaVersion := "2.12.14"

向其中添加Spark Core和Spark SQL依赖,内容如下:

libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-core" % "3.1.2",
  "org.apache.spark" %% "spark-sql" % "3.1.2"
)

说明:SBT依赖库的内容和格式,可以到mvn repository查询。

保存文件后,IntelliJ将自动导入运行Spark所需的库和文档,因此要确保你的电脑是可以连网的。结果如下所示:


五、准备数据文件

接下来,我们将构建一个简单的Spark应用程序,用来对莎士比亚文集(shakespeare.txt)执行单词计数任务。

我们需要在两个地方保存shakespeare.txt数据集。一个在项目中用于本地系统测试,另一个在HDFS (Hadoop分布式文件系统)中用于集群测试。

将PBDP大数据平台中的/home/hduser/data/spark/shakespeare.txt文件上传到HDFS:

1)确保已经启动了HDFS;

2)在终端窗口中,执行以下命令,上传文件到HDFS上:

$ cd /home/hduser/data/spark
$ hdfs dfs -put shakespeare.txt /data/spark/


六、创建Spark应用程序

现在,我们准备开始编写Spark应用程序。

1)回到项目中,在src/main下创建一个名为resources的文件夹(如果它不存在的话),并将shakespear .txt拷贝到该文件夹下。

2)接下来,在src/main/scala下创建一个新类。右击“scala > New > Scala Class”,如下图所示:

3)接下来,IDE会询问是创建一个class、object还是trait。选择object,将该文件命名为“HelloWord”,如下图所示:

4)编辑该源文件,代码如下:

object HelloWord {
  def main(args: Array[String]): Unit ={
    println("Hello World!")
  }
}

5)运行程序。在文件上任何位置单击右键,在弹出的环境菜单中,选择【Run 'HelloWord'】。

6)如果一切正确,该IDE应该在下方的控制台窗口输出“HelloWorld!”。如下图所示:

7)现在我们知道环境已经正确设置,接下来用以下代码替换文件中原来的内容:

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SparkSession

object HelloWord {

  def main(args: Array[String]) {

    // 在windows下开发时设置
    System.setProperty("HADOOP_USER_NAME", "hduser")

    // 创建一个SparkContext来初始化Spark
    // Spark 2.0以前的用法
    // val conf = new SparkConf().setMaster("local").setAppName("Word Count")
    // val sc = new SparkContext(conf)

    // Spark 2.0以后的用法
    val spark = SparkSession.builder().master("local[*]").appName("Word Count").getOrCreate()
    val sc = spark.sparkContext

    // 将文本加载到Spark RDD中,它是文本中每一行的分布式表示
    val file = "src/main/resources/shakespeare.txt"
    val textFile = sc.textFile(file)

    // transformation转换
    val counts = textFile.flatMap(line => line.split(" "))
                      .map(word => (word, 1))
                      .reduceByKey(_ + _)

    counts.collect.foreach(println)
    System.out.println("全部单词: " + counts.count());

    // 将单词计数结果保存到指定文件中
    val output = "tmp/shakespeareWordCount"
    counts.saveAsTextFile(output)
  }

}

8)和刚才一样,右键单击,并选择【Run ' HelloScala'】来运行程序。这将运行Spark作业并打印莎士比亚作品中出现的每个单词的频率,预期输出如下:

9)此外,如果浏览指定的目录:

counts.saveAsTextFile ("tmp/shakespeareWordCount");

将找到程序的输出:

注意我们在代码中设置了下面这一行代码:

.master("local[*]")

这告诉Spark使用这台计算机在本地运行,而不是在分布式模式下运行。要在多台机器上运行Spark,我们需要更改此值。(稍后我们将看到如何更改)


七、部署分布式Spark应用程序

现在我们已经了解了如何在IDE中直接部署应用程序。这是一种快速构建和测试应用程序的好方法。但是在生产环境中,Spark通常会处理存储在HDFS等分布式文件系统中的数据。Spark通常也以集群模式运行(即分布在许多机器上)。

接下来,我们修改代码,使其能部署到Spark分布式集群上运行。请按以下步骤操作。

1)修改源代码,如下面代码中粗体部分所示:

import org.apache.spark.{SparkConf, SparkContext}

object HelloScala {

  def main(args: Array[String]) {

    // 创建一个SparkContext来初始化Spark
    val spark = SparkSession.builder().appName("Word Count").getOrCreate()
    val sc = spark.sparkContext

    // 将文本加载到Spark RDD中,它是文本中每一行的分布式表示
    val file = "hdfs://localhost:8020/data/spark_demo/shakespeare.txt"
    val textFile = sc.textFile(file)

    // word count
    val counts = textFile.flatMap(line => line.split(" "))
                      .map(word => (word, 1))
                      .reduceByKey(_ + _)

    counts.collect.foreach(println)
    System.out.println("全部单词: " + counts.count());

    // 将单词计数结果保存到指定输出目录中
    val output = "hdfs://localhost:8020/data/spark_demo/shakespeareWordCount"
    counts.saveAsTextFile(output)
  }

}

这告诉Spark读写HDFS,而不是本地。

2)创建JAR文件

我们将把这些代码打包到一个已编译的jar文件中,该文件可以部署在Spark集群上。

在IntelliJ IDEA菜单栏选择【Tools】|【Start SBT Shell】,在编辑窗口下方打开sbt shell交互窗口,然后就可以应用sbt的clean、compile、package等命令进行操作。这里我们执行打包命令:

> package

打包过程如下图所示:

这将在项目的“target/scala-2.11”下创建一个名为“hellospark_2.11-1.0”的编译过的jar文件。

3)将该jar包提交到Spark集群上执行。使用spark-submit运行我们的代码。

$ cd ~/bigdata/spark-3.1.2
$ ./bin/spark-submit --class HelloWord --master local[*] ./hellospark_2.11-1.0.jar

提交任务时需要指定主类、要运行的jar包和运行模式(本地或集群):

4)控制台应该打印莎士比亚作品中出现的每个单词的频率,如下所示:

...
(comutual,1)
(ban-dogs,1)
(rut-time,1)
(ORLANDO],4)
(Deceitful,1)
(commits,3)
(GENTLEWOMAN,4)
(honors,10)
(returnest,1)
(topp'd?,1)
(compass?,1)
(toothache?,1)
(miserably,1)
(hen?,1)
(luck?,2)
(call'd,162)
(lecherous,2)
...

5)此外,可通过HDFS或Web UI查看输出文件的内容:

$ hdfs dfs -cat /data/spark_demo/shakespeareWordCount/part-00000


八、远程调试Spark程序

在本节中,我们将学习如何将正在运行的Spark程序连接到调试器,调试器允许我们设置断点并逐行执行代码。在直接从IDE运行时,调试Spark和其他任何程序一样,但是调试远程集群需要一些配置。

1)在计划提交Spark作业的机器上,从终端运行以下代码:

$ export SPARK_SUBMIT_OPTS=-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=5005

2)提交Spark job作业,运行时会出现程序挂起,监听端口中。如下:

$ spark-submit --class HelloWord --master local[*] --driver-java-options  bigdata/hellospark_2.11-1.0.jar

3)在IntelliJ IDEA中配置remote debug:在IntelliJ中选择菜单项【Run】|【Edit Configurations】,编辑运行配置信息。如下:

4)然后单击左上角的+按钮,选择面板左侧的Remote项,增加一个新的远程配置。在面板右侧,使用主机ip地址填写Host(Port字段默认为5005,不需要修改),这将允许在端口5005上关联调试器。需要确保端口5005能够接收入站连接。

5)从IDE中debug此调试配置,调试器将附加此调试配置,并且远程Spark程序将在断点处停止。还可以检查程序中活动变量的值。在试图确定代码中的bug时,这是非常宝贵的。


《Flink原理深入与编程实战》