示例:加载CSV文件到RDD中
使用SparkContext类中的textFile()方法,我们可以读取单个CSV文件、多个CSV文件(基于模式匹配)、或者从一个目录中读取所有文件到RDD[String]对象。
在开始之前,让我们假设在文件夹~/data/spark/files3中有以下文件名和文件内容,文件内容以逗号分隔,我们将使用这些文件来演示示例。
| 文件名 | 文件内容 |
|---|---|
| text01.csv |
Col1,Col2 one,1 Eleven,11 |
| text02.csv |
Col1,Col2 two,2 Twenty One,21 |
| text03.csv |
Col1,Col2 three,3 |
| text04.csv |
Col1,Col2 four,4 |
| invalid.csv |
Col1,Col2 invalid,I |
点击以下链接,快速浏览相应内容:
读取单个CSV文件到RDD
SparkContext的textFile()方法将整个CSV记录作为字符串读取并返回RDD[String],因此,我们需要在Spark中编写额外的代码,通过使用分隔符分割字符串记录将RDD[String]转换为RDD[Array[String]]。
下面的例子将一个文件读入RDD对象,RDD中的每个元素都表示为一个字符串。但是,我们需要CSV中的每条记录用逗号分隔符分割,并将其作为多列存储在RDD中。为了实现这一点,我们应该在RDD上使用map()转换,通过用逗号分隔符分割每条记录,将RDD[String]转换为RDD[Array[String]。map()方法返回新的RDD,而不是更新现有的RDD。
// 定义文件路径
val file1 = "file:///home/hduser/data/spark/files3/text01.csv"
// 加载文件到RDD
val rdd1 = spark.sparkContext.textFile(file1)
// 对RDD转换,分割每个元素字符串
val rdd2 = rdd1.map(_.split(","))
// 将rdd收集回driver端,并遍历
rdd2.collect.foreach(arr => println("列1:" + arr(0) + ", 列2:" + arr(1)))
执行以上代码,输出内容如下:
列1:Col1, 列2:Col2 列1:one, 列2:1 列1:Eleven, 列2:11
在本例中,collect()方法返回Array[Array[String]]类型,第一个数组表示RDD数据,内部数组是一条记录。
注意,我们从上面的“println”得到的输出也包含来自CSV文件的标题名称,因为标题行在RDD中被视为数据本身。我们需要在处理数据时跳过标题。
因为在RDD中没有办法指定文件有标题,所以可以用以下方法来处理:
// 定义文件路径
val file1 = "file:///home/hduser/data/spark/files3/text01.csv"
// 加载文件到RDD
val rdd1 = spark.sparkContext.textFile(file1)
// 转换RDD
val rdd2 = rdd1
.mapPartitionsWithIndex { (idx, iter) => if (idx == 0) iter.drop(1) else iter } // 删除标题行
.map(_.split(",")) // 分割字符串
// 将rdd收集回driver端,并遍历
rdd2.collect.foreach(arr => println("列1:" + arr(0) + ", 列2:" + arr(1)))
执行以上代码,输出内容如下:
列1:one, 列2:1 列1:Eleven, 列2:11
读取多个CSV文件到RDD
要在Spark中读取多个CSV文件,只需在SparkContext对象上使用textFile()方法,通过传递所有文件名(以逗号分隔)。下面的示例将text01.csv和text02.csv文件读取到单个RDD中。
// 定义文件路径
val file1 = "file:///home/hduser/data/spark/files3/text01.csv"
val file2 = "file:///home/hduser/data/spark/files3/text02.csv"
// 加载文件到RDD,多个文件名以逗号分隔
val rdd1 = spark.sparkContext.textFile(s"$file1,$file2")
// 转换RDD
val rdd2 = rdd1
.mapPartitionsWithIndex { (idx, iter) => iter.drop(1)} // 删除标题行
.map(_.split(",")) // 分割字符串
// 将rdd收集回driver端,并遍历
rdd2.collect.foreach(arr => println("列1:" + arr(0) + ", 列2:" + arr(1)))
执行以上代码,输出内容如下:
列1:one, 列2:1 列1:Eleven, 列2:11 列1:two, 列2:2 列1:Twenty One, 列2:21
读取一个目录下所有CSV文件到RDD
要读取目录或文件夹中的所有CSV文件,只需将目录路径传递给testFile()方法即可。
// 定义要读取的目录,使用通配符*
val fileDir = "file:///home/hduser/data/spark/files3/*"
// 加载目录下所有文件到RDD
val rdd1 = spark.sparkContext.textFile(fileDir)
// 转换RDD
val rdd2 = rdd1.map(_.split(",")) // 分割字符串
// 将rdd收集回driver端,并遍历
rdd2.collect.foreach(arr => println("列1:" + arr(0) + ", 列2:" + arr(1)))
执行以上代码,输出内容如下:
列1:Col1, 列2:Col2 列1:invalid, 列2:i 列1:Col1, 列2:Col2 列1:one, 列2:1 列1:Eleven, 列2:11 列1:Col1, 列2:Col2 列1:two, 列2:2 列1:Twenty One, 列2:21 列1:Col1, 列2:Col2 列1:three, 列2:3 列1:Col1, 列2:Col2 列1:four, 列2:4
课程章节 返回课程首页
-
Ch01 Spark架构与集群搭建
-
Ch02 开发和部署Spark程序
-
Ch03 Spark核心编程
-
Ch04 Spark SQL编程
-
Ch05 Spark SQL编程(高级)
-
Ch06 Spark Streaming流处理
-
Ch07 Spark结构化流处理
-
ch08 Spark结构化流(高级)
-
综合项目实训