发布日期:2023-06-15 VIP内容

示例:读取多个文本文件到单个RDD中

弹性分布式数据集(RDD)是Spark的基本数据结构,它是一个不可变的分布式对象集合。RDD中的每个数据集被划分为多个逻辑分区,这些逻辑分区可以在集群的不同节点上进行计算。

Spark Core在SparkContext类中提供了textFile()和wholeTextFiles()方法,用于将单个和多个文本或csv文件读取到单个Spark RDD中。使用这种方法,我们还可以从目录和具有特定模式的多文件中读取所有文件。

  • textFile():读取单个或多个文本文件、csv文件,并返回单个Spark RDD [String]。
  • wholeTextFiles():读取单个或多个文件,并返回单个RDD[Tuple2[String, String]],其中元组中的第一个值(_1)是文件名,第二个值(_2)是文件的内容。

接下来,让我们看一些读取多个文本文件到单个RDD中的示例。

假设在文件夹~/data/spark/files2中有以下文件名和文件内容,我们将使用这些文件来演示示例。

文件名 文件内容
text01.txt one,1
text02.txt two,2
text03.txt three,3
text04.txt four,4
invalid.txt invalid,I

将目录中的所有文本文件读入单个RDD

在Spark中,通过向textFile()方法输入目录路径,读取所有文本文件并创建单个RDD。确保没有嵌套目录,否则Spark进程会失败并出现错误。

// 文件目录
val filesDir = "file:///home/hduser/data/spark/files2/*"

// 读取目录下所有文件到单个RDD
val rdd = spark.sparkContext.textFile(filesDir)

// 在driver端遍历 
rdd.collect.foreach(println)

执行以上代码,输出内容如下:

invalid,I
one,1
two,2
three,3
four,4

让我们看一个使用wholeTextFiles()方法的类似示例。注意,这返回一个RDD[Tuple2]。其中元组中的第一个值(_1)是文件名,第二个值(_2)是文件的内容。

// 文件目录
val filesDir = "file:///home/hduser/data/spark/files2/*"

// 读取目录下所有文件到单个RDD
val rddWhole = spark.sparkContext.wholeTextFiles(filesDir)

rddWhole.collect.foreach(f => {
    print(f._1 + "=>" + f._2)
})  

执行以上代码,输出内容如下:

file:/home/hduser/data/spark/files2/invalid.txt=>invalid,I
file:/home/hduser/data/spark/files2/text01.txt=>one,1
file:/home/hduser/data/spark/files2/text02.txt=>two,2
file:/home/hduser/data/spark/files2/text03.txt=>three,3
file:/home/hduser/data/spark/files2/text04.txt=>four,4

将多个文本文件读取到单个RDD中

当知道要读取的多个文件的名称时,只需用逗号分隔输入所有文件名,就可以创建单个RDD。

// 文件目录
val filesBase = "file:///home/hduser/data/spark/files2/"

// 多个文件名
val file1 = filesBase + "text01.txt"
val file2 = filesBase + "text02.txt"

// 读取多个文本文件
val rdd2 = spark.sparkContext.textFile(s"$file1,$file2")  // 注意,逗号前后不要有空格

rdd2.collect.foreach(println)

执行以上代码,输出内容如下:

one,1
two,2

将所有匹配模式的文本文件读取到单个RDD

textFile()方法也接受模式匹配和通配字符。例如,下面的代码片段读取所有以text开头并带有.txt扩展名的文件,并创建单个RDD。

// 文件目录
val filesDir = "file:///home/hduser/data/spark/files2/text*.txt"

// 按匹配模式读取文件到单个RDD
val rdd3 = spark.sparkContext.textFile(filesDir)

rdd3.collect.foreach(println)  

执行以上代码,输出内容如下:

one,1
two,2
three,3
four,4

从多个目录中读取文件到单个RDD

textFile()方法还支持读取文件和多目录组合。

// 文件目录
val filesDir1 = "file:///home/hduser/data/spark/files/*"
val filesDir2 = "file:///home/hduser/data/spark/files2/*"
val filePath = "file:///home/hduser/data/spark/files2/text01.txt"

// 从多个目录读取文件到单个RDD
val rdd4 = spark.sparkContext.textFile(s"$filesDir1,$filesDir2,$filePath")

rdd4.collect.foreach(println)   

执行以上代码,输出内容如下:

good good study
day day up
to be or not to be,this is a question.
invalid,I
one,1
two,2
three,3
four,4
one,1

从嵌套目录中读取文本文件到单个RDD

textFile()和wholeTextFile()在找到一个嵌套文件夹时返回一个错误,因此,首先使用scala、Java或Python语言通过遍历所有嵌套文件夹创建一个文件路径列表,并通过逗号分隔符传递所有文件名以创建单个RDD。

单独读取所有文本文件并合并以创建单个RDD

还可以将所有文本文件分别读入各自单独的RDD,然后将所有这些RDD合并为一个RDD。

读取多个CSV文件

Spark RDD没有读取csv文件格式的方法,因此我们将使用textFile()方法将csv文件像任何其他文本文件一样读取到RDD中,并基于逗号、管道或任何其他分隔符分割记录。

// 文件目录
val filesDir = "file:///home/hduser/data/spark/files2/*"

// 从多个目录读取文件到单个RDD
val rdd5 = spark.sparkContext.textFile(filesDir)

// 使用分隔符分割
val rdd6 = rdd5.map(row => row.split(","))

rdd6.collect.foreach(r => println(r(0) + "," + r(1)))   

执行以上代码,输出内容如下:

invalid,I
one,1
two,2
three,3
four,4