发布日期:2023-06-15 VIP内容

示例:加载CSV文件到RDD中

使用SparkContext类中的textFile()方法,我们可以读取单个CSV文件、多个CSV文件(基于模式匹配)、或者从一个目录中读取所有文件到RDD[String]对象。

在开始之前,让我们假设在文件夹~/data/spark/files3中有以下文件名和文件内容,文件内容以逗号分隔,我们将使用这些文件来演示示例。

文件名 文件内容
text01.csv Col1,Col2
one,1
Eleven,11
text02.csv Col1,Col2
two,2
Twenty One,21
text03.csv Col1,Col2
three,3
text04.csv Col1,Col2
four,4
invalid.csv Col1,Col2
invalid,I

点击以下链接,快速浏览相应内容:

读取单个CSV文件到RDD

SparkContext的textFile()方法将整个CSV记录作为字符串读取并返回RDD[String],因此,我们需要在Spark中编写额外的代码,通过使用分隔符分割字符串记录将RDD[String]转换为RDD[Array[String]]。

下面的例子将一个文件读入RDD对象,RDD中的每个元素都表示为一个字符串。但是,我们需要CSV中的每条记录用逗号分隔符分割,并将其作为多列存储在RDD中。为了实现这一点,我们应该在RDD上使用map()转换,通过用逗号分隔符分割每条记录,将RDD[String]转换为RDD[Array[String]。map()方法返回新的RDD,而不是更新现有的RDD。

// 定义文件路径
val file1 = "file:///home/hduser/data/spark/files3/text01.csv" 

// 加载文件到RDD
val rdd1 = spark.sparkContext.textFile(file1)

// 对RDD转换,分割每个元素字符串
val rdd2 = rdd1.map(_.split(","))

// 将rdd收集回driver端,并遍历
rdd2.collect.foreach(arr => println("列1:" + arr(0) + ", 列2:" + arr(1)))

执行以上代码,输出内容如下:

列1:Col1, 列2:Col2
列1:one, 列2:1
列1:Eleven, 列2:11

在本例中,collect()方法返回Array[Array[String]]类型,第一个数组表示RDD数据,内部数组是一条记录。

注意,我们从上面的“println”得到的输出也包含来自CSV文件的标题名称,因为标题行在RDD中被视为数据本身。我们需要在处理数据时跳过标题。

因为在RDD中没有办法指定文件有标题,所以可以用以下方法来处理:

// 定义文件路径
val file1 = "file:///home/hduser/data/spark/files3/text01.csv" 

// 加载文件到RDD
val rdd1 = spark.sparkContext.textFile(file1)

// 转换RDD
val rdd2 = rdd1
    .mapPartitionsWithIndex { (idx, iter) => if (idx == 0) iter.drop(1) else iter }    // 删除标题行
    .map(_.split(","))  // 分割字符串
    
// 将rdd收集回driver端,并遍历
rdd2.collect.foreach(arr => println("列1:" + arr(0) + ", 列2:" + arr(1)))

执行以上代码,输出内容如下:

列1:one, 列2:1
列1:Eleven, 列2:11

读取多个CSV文件到RDD

要在Spark中读取多个CSV文件,只需在SparkContext对象上使用textFile()方法,通过传递所有文件名(以逗号分隔)。下面的示例将text01.csv和text02.csv文件读取到单个RDD中。

// 定义文件路径
val file1 = "file:///home/hduser/data/spark/files3/text01.csv" 
val file2 = "file:///home/hduser/data/spark/files3/text02.csv" 

// 加载文件到RDD,多个文件名以逗号分隔
val rdd1 = spark.sparkContext.textFile(s"$file1,$file2")

// 转换RDD
val rdd2 = rdd1
    .mapPartitionsWithIndex { (idx, iter) => iter.drop(1)}    // 删除标题行
    .map(_.split(","))  // 分割字符串
    
// 将rdd收集回driver端,并遍历
rdd2.collect.foreach(arr => println("列1:" + arr(0) + ", 列2:" + arr(1)))

执行以上代码,输出内容如下:

列1:one, 列2:1
列1:Eleven, 列2:11
列1:two, 列2:2
列1:Twenty One, 列2:21

读取一个目录下所有CSV文件到RDD

要读取目录或文件夹中的所有CSV文件,只需将目录路径传递给testFile()方法即可。

// 定义要读取的目录,使用通配符*
val fileDir = "file:///home/hduser/data/spark/files3/*" 

// 加载目录下所有文件到RDD
val rdd1 = spark.sparkContext.textFile(fileDir)

// 转换RDD
val rdd2 = rdd1.map(_.split(","))  // 分割字符串
    
// 将rdd收集回driver端,并遍历
rdd2.collect.foreach(arr => println("列1:" + arr(0) + ", 列2:" + arr(1))) 

执行以上代码,输出内容如下:

列1:Col1, 列2:Col2
列1:invalid, 列2:i
列1:Col1, 列2:Col2
列1:one, 列2:1
列1:Eleven, 列2:11
列1:Col1, 列2:Col2
列1:two, 列2:2
列1:Twenty One, 列2:21
列1:Col1, 列2:Col2
列1:three, 列2:3
列1:Col1, 列2:Col2
列1:four, 列2:4