理解RDD及RDD编程模型

Spark Core包含Spark的基本功能,包括任务调度组件、内存管理、故障恢复、与存储系统交互等。在Spark Core中,核心的数据抽象,称为弹性分布式数据集(RDD)。 RDD是Spark Core的用户级API,要真正理解Spark的工作原理,就必须理解RDD的本质。它们为其他抽象提供了一个非常坚实的基础。

配套视频:

Spark核心数据抽象RDD


Spark为Scala、Java、R和Python编程语言提供了APIs。Spark本身是用Scala编写的,但Spark通过PySpark支持Python。 PySpark构建在Spark的Java API之上(使用Py4J)。通过Spark(PySpark)上的Python的交互式shell,可以对大数据进行交互式数据分析。 数据科学界大多选择Scala或Python来进行Spark程序开发。

1、理解数据抽象RDD

在Spark的编程接口中,每一个数据集都被表示为一个对象,称为RDD。RDD是一个只读的(不可变的)、分区的(分布式的)、容错的、延迟计算的、类型推断的和可缓存的记录集合。

所谓RDD (Resilient Distributed Dataset,弹性分布式数据集),指的是:

  • Resilient:不可变的、容错的
  • Distributed:数据分散在不同节点(机器,进程)
  • Dataset:一个由多个分区组成的数据集

Spark RDD是对跨集群分布的各个分区的引用的集合。参考下图理解:

RDD是Resillient Distributed Dataset(弹性分布式数据集)的简称,是分布式内存的一个抽象概念,提供了一种高度受限的共享内存模型。通常RDD很大,会被分成很多个分区,分别保存在不同的节点上。RDD是不可变的、容错的、并行的数据结构,允许用户显式地将中间结果持久化到内存中,控制分区以优化数据放置,并使用一组丰富的操作符来操作它们。

RDD被设计成不可变的,这意味着我们不能具体地修改数据集中由RDD表示的特定行。如果调用一个RDD操作来操纵RDD中的行,该操作将返回一个新的RDD。原RDD保持不变,新的RDD将以我们希望的方式包含数据。RDD的不变性本质上要求RDD携带"血统"信息,Spark利用这些信息有效地提供容错能力。

RDD提供了一组丰富的常用数据处理操作。它们包括执行数据转换、过滤、分组、连接、聚合、排序和计数的能力。关于这些操作需要注意的一点是,它们在粗粒度级别上进行操作,这意味着相同的操作应用于许多行,而不是任何特定的行。

综上所述,RDD只是一个逻辑概念,它可能并不对应磁盘或内存中的物理数据。根据Spark官方描述,RDD由以下五部分组成:

  • 一组partition(分区),即组成整个数据集的块;
  • 每个partition(分区)的计算函数(用于计算数据集中所有行的函数);
  • 所依赖的RDD列表(即父RDD列表);
  • (可选的)对于key-value类型的RDD,则包含一个Partitioner(默认是HashPartitioner);
  • (可选的)每个partition数据驻留在集群中的位置(可选);如果数据存放在HDFS上,那么它就是块所在的位置。

Spark运行时使用这5条信息来调度和执行通过RDD操作表示的用户数据处理逻辑。 前三段信息组成“血统”信息,Spark将其用于两个目的。第一个是确定RDDs的执行顺序,第二个是用于故障恢复目的。

Spark通过使用"血统"信息重建失败的部分,自动地代表其用户处理故障。

每一个RDD或RDD分区都知道如何在出现故障时重新创建自己。它有转换的日志,或者血统(lineage),可依据此从稳定存储器或另一个RDD中重新创建自己的。 因此,任何使用Spark的程序都可以确保内置的容错能力,而不考虑底层数据源和RDD类型。

作为Spark中最核心的数据抽象,RDD具有以下特征:

  • In-Memory:RDD会优先使用内存;
  • Immutable(Read-Only):一旦创建不可修改;
  • Lazy evaluated:惰性执行;
  • Cacheable:可缓存,可复用;
  • Parallel:可并行处理;
  • Typed:强类型,单一类型数据;
  • Partitioned:分区的;
  • Location-Stickiness:可指定分区优先使用的节点;

2、RDD编程模型

在Spark中,使用RDD对数据进行处理,通常遵循如下的模型:

  • 首先,将待处理的数据构造为RDD;
  • 对RDD进行一系列操作,包括Transformation和Action两种类型操作;
  • 最后,输出或保存计算结果。

这个处理流程可以用下图表示:

下面我们使用Spark RDD来实现经典的单词计数应用程序。

【示例】使用Spark RDD实现单词计数。这里我们使用Zeppelin作为开发工具,大家可以根据自己的喜好选择任意其他工具。

(1)首先准备一个文本文件wc.txt,内容如下:

    good good study
    day day up         

(2)将该文本文件上传到HDFS的"/data/spark_demo/"目录下:

    $ hdfs dfs -put wc.txt /data/spark_demo/

(3)在Zeppelin中新建一个notebook。在notebook的单元格中,执行代码。

(4)读取数据源文件,构造一个RDD

    val source = "hdfs://localhost:8020/data/wc.txt"
    var textFile = sc.textFile(source)         

(5)将每行数据按空格拆分成单词 - 使用flatMap转换

    val words = textFile.flatMap(line => line.split(" "))

(6)将各个单词加上计数值1 - 使用map转换

    val wordPairs = words.map(word => (word,1))

(7)对所有相同的单词进行聚合相加求各单词的总数 - 使用reduceByKey转换

    val vordCounts = wordPairs.reduceByKey((a,b) => a + b)

(8)返回结果给Driver程序,这一步才触发RDD开始实际的计算 - Action

    wordCounts.collect()         

(9)或者,也可以将计算结果保存到文件中 - Action

    val sink = "hdfs://localhost:8020/data/result"
    wordCounts.saveAsTextFile(sink)        

在Zeppelin中交互式数据处理过程如下图所示:

以上代码也可以精简为下面一句:

    val source = "hdfs://localhost:8020/data/wc.txt"
    sc.textFile(source)
      .flatMap(_.split(" "))
      .map((_,1))
      .reduceByKey(_+_)
      .collect
      .foreach(println)      

《Flink原理深入与编程实战》