创建RDD的方法
在对数据进行任何transformation或action操作之前,必须先将这些数据构造为一个RDD。Spark提供了创建RDDs的三种方法,分别为:
- 第一种方法是将现有的集合并行化。
- 另一种方法是加载外部存储系统中的数据集,比如文件系统。
- 第三种方法是在现有RDD上进行转换来得到新的RDD。
将现有的集合并行化以创建RDD
创建RDD的第一种方法是将对象集合并行化,这意味着将其转换为可以并行操作的分布式数据集。对象集合的并行化是通过调用SparkContext类的parallelize方法实现的。
请看下面的代码:
from pyspark.sql import SparkSession spark = SparkSession.builder \ .master("spark://xueai8:7077") \ .appName("pyspark demo") \ .getOrCreate() sc = spark.sparkContext # 通过并行集合(数组)创建RDD array1 = [1,2,3,4,5,6,7,8,9,10] # rdd1 = sc.parallelize(array1) # rdd1 = sc.parallelize([1,2,3,4,5,6,7,8,9,10]) rdd1.collect() # 或者 list2 = list(range(11)) # range(start=0,end,step=0) rdd2 = sc.parallelize(list2) rdd2.collect() # 通过并行集合(数组)创建RDD strList = ["明月几时有","把酒问青天","不知天上宫阙","今夕是何年"] strRDD = sc.parallelize(strList) strRDD.collect()
执行过程及输出结果如下图所示:
从存储系统读取数据集以创建RDD
创建RDD的第二种方法是从存储系统读取数据集,存储系统可以是本地计算机文件系统、HDFS、Cassandra、Amazon S3等等。
请看下面的代码:
# 从文件系统中加载数据创建RDD file = "/data/spark/rdd/wc.txt" # hdfs rdd3 = sc.textFile(file) # sc是SparkContext的实例 print(type(rdd3)) # 查看rdd3的类型 rdd3.collect()
SparkContext类的textFile方法假设每个文件是一个文本文件,并且每行由一个新行分隔。此textFile方法返回一个RDD,它表示所有文件中的所有行。需要注意的重要一点是,textFile方法是延迟计算的,这意味着如果指定了错误的文件或路径,或者错误地拼写了目录名,那么在采取其中一项action操作之前,这个问题不会出现(因此也不会被发现)。
从已有的RDD转换得到新的RDD
创建RDD的第三种方法是调用现有RDD上的一个转换操作。例如,下面的代码通过对rdd4的转换得到一个新的RDD - rdd5:
# 字符转为大写,得到一个新的RDD rdd4 = rdd3.map(lambda line: line.upper()) rdd4.collect()
注:关于map函数,在稍后部分讲解。
创建RDD时指定分区数量
Spark在集群的每个分区上运行一个任务(task),因此必须谨慎地决定优化计算工作。尽管Spark会根据集群自动设置分区数量,但我们可以通过将其作为第二个参数传递给并行化函数。例如:
sc.parallelize(data,3) // 3个分区
下图表示创建一个RDD,包含14条记录(或元组),分区为3,分布在三个节点上: