创建RDD的方法

在对数据进行任何transformation或action操作之前,必须先将这些数据构造为一个RDD。Spark提供了创建RDDs的三种方法,分别为:

  • 第一种方法是将现有的集合并行化。
  • 另一种方法是加载外部存储系统中的数据集,比如文件系统。
  • 第三种方法是在现有RDD上进行转换来得到新的RDD。

将现有的集合并行化以创建RDD

创建RDD的第一种方法是将对象集合并行化,这意味着将其转换为可以并行操作的分布式数据集。对象集合的并行化是通过调用SparkContext类的parallelize方法实现的。

请看下面的代码:

from pyspark.sql import SparkSession

spark = SparkSession.builder \
   .master("spark://xueai8:7077") \
   .appName("pyspark demo") \
   .getOrCreate()

sc = spark.sparkContext

# 通过并行集合(数组)创建RDD
array1 = [1,2,3,4,5,6,7,8,9,10]    # 
rdd1 = sc.parallelize(array1)
# rdd1 = sc.parallelize([1,2,3,4,5,6,7,8,9,10])

rdd1.collect()

# 或者
list2 = list(range(11))    # range(start=0,end,step=0)
rdd2 = sc.parallelize(list2)

rdd2.collect()

# 通过并行集合(数组)创建RDD
strList = ["明月几时有","把酒问青天","不知天上宫阙","今夕是何年"]
strRDD = sc.parallelize(strList)
strRDD.collect()

执行过程及输出结果如下图所示:

从存储系统读取数据集以创建RDD

创建RDD的第二种方法是从存储系统读取数据集,存储系统可以是本地计算机文件系统、HDFS、Cassandra、Amazon S3等等。

请看下面的代码:

#  从文件系统中加载数据创建RDD
file = "/data/spark/rdd/wc.txt"      		# hdfs
rdd3 = sc.textFile(file)                 	# sc是SparkContext的实例

print(type(rdd3))    				# 查看rdd3的类型

rdd3.collect() 

SparkContext类的textFile方法假设每个文件是一个文本文件,并且每行由一个新行分隔。此textFile方法返回一个RDD,它表示所有文件中的所有行。需要注意的重要一点是,textFile方法是延迟计算的,这意味着如果指定了错误的文件或路径,或者错误地拼写了目录名,那么在采取其中一项action操作之前,这个问题不会出现(因此也不会被发现)。

从已有的RDD转换得到新的RDD

创建RDD的第三种方法是调用现有RDD上的一个转换操作。例如,下面的代码通过对rdd4的转换得到一个新的RDD - rdd5:

# 字符转为大写,得到一个新的RDD
rdd4 = rdd3.map(lambda line: line.upper()) 
rdd4.collect()

注:关于map函数,在稍后部分讲解。

创建RDD时指定分区数量

Spark在集群的每个分区上运行一个任务(task),因此必须谨慎地决定优化计算工作。尽管Spark会根据集群自动设置分区数量,但我们可以通过将其作为第二个参数传递给并行化函数。例如:

sc.parallelize(data,3)      // 3个分区

下图表示创建一个RDD,包含14条记录(或元组),分区为3,分布在三个节点上:


《PySpark原理深入与编程实战》