示例:读取多个文本文件到单个RDD中
弹性分布式数据集(RDD)是Spark的基本数据结构,它是一个不可变的分布式对象集合。RDD中的每个数据集被划分为多个逻辑分区,这些逻辑分区可以在集群的不同节点上进行计算。
Spark Core在SparkContext类中提供了textFile()和wholeTextFiles()方法,用于将单个和多个文本或csv文件读取到单个Spark RDD中。使用这种方法,我们还可以从目录和具有特定模式的多文件中读取所有文件。
- textFile():读取单个或多个文本文件、csv文件,并返回单个Spark RDD [String]。
- wholeTextFiles():读取单个或多个文件,并返回单个RDD[Tuple2[String, String]],其中元组中的第一个值(_1)是文件名,第二个值(_2)是文件的内容。
接下来,让我们看一些读取多个文本文件到单个RDD中的示例。
- 将目录中的所有文本文件读入单个RDD
- 将多个文本文件读取到单个RDD中
- 将所有匹配模式的文本文件读取到单个RDD
- 从多个目录中读取文件到单个RDD
- 从嵌套目录中读取文本文件到单个RDD
- 单独读取所有文本文件并合并以创建单个RDD
- 读取多个CSV文件
假设在文件夹~/data/spark/files2中有以下文件名和文件内容,我们将使用这些文件来演示示例。
| 文件名 | 文件内容 |
|---|---|
| text01.txt | one,1 |
| text02.txt | two,2 |
| text03.txt | three,3 |
| text04.txt | four,4 |
| invalid.txt | invalid,I |
将目录中的所有文本文件读入单个RDD
在Spark中,通过向textFile()方法输入目录路径,读取所有文本文件并创建单个RDD。确保没有嵌套目录,否则Spark进程会失败并出现错误。
// 文件目录 val filesDir = "file:///home/hduser/data/spark/files2/*" // 读取目录下所有文件到单个RDD val rdd = spark.sparkContext.textFile(filesDir) // 在driver端遍历 rdd.collect.foreach(println)
执行以上代码,输出内容如下:
invalid,I one,1 two,2 three,3 four,4
让我们看一个使用wholeTextFiles()方法的类似示例。注意,这返回一个RDD[Tuple2]。其中元组中的第一个值(_1)是文件名,第二个值(_2)是文件的内容。
// 文件目录
val filesDir = "file:///home/hduser/data/spark/files2/*"
// 读取目录下所有文件到单个RDD
val rddWhole = spark.sparkContext.wholeTextFiles(filesDir)
rddWhole.collect.foreach(f => {
print(f._1 + "=>" + f._2)
})
执行以上代码,输出内容如下:
file:/home/hduser/data/spark/files2/invalid.txt=>invalid,I file:/home/hduser/data/spark/files2/text01.txt=>one,1 file:/home/hduser/data/spark/files2/text02.txt=>two,2 file:/home/hduser/data/spark/files2/text03.txt=>three,3 file:/home/hduser/data/spark/files2/text04.txt=>four,4
将多个文本文件读取到单个RDD中
当知道要读取的多个文件的名称时,只需用逗号分隔输入所有文件名,就可以创建单个RDD。
// 文件目录 val filesBase = "file:///home/hduser/data/spark/files2/" // 多个文件名 val file1 = filesBase + "text01.txt" val file2 = filesBase + "text02.txt" // 读取多个文本文件 val rdd2 = spark.sparkContext.textFile(s"$file1,$file2") // 注意,逗号前后不要有空格 rdd2.collect.foreach(println)
执行以上代码,输出内容如下:
one,1 two,2
将所有匹配模式的文本文件读取到单个RDD
textFile()方法也接受模式匹配和通配字符。例如,下面的代码片段读取所有以text开头并带有.txt扩展名的文件,并创建单个RDD。
// 文件目录 val filesDir = "file:///home/hduser/data/spark/files2/text*.txt" // 按匹配模式读取文件到单个RDD val rdd3 = spark.sparkContext.textFile(filesDir) rdd3.collect.foreach(println)
执行以上代码,输出内容如下:
one,1 two,2 three,3 four,4
从多个目录中读取文件到单个RDD
textFile()方法还支持读取文件和多目录组合。
// 文件目录 val filesDir1 = "file:///home/hduser/data/spark/files/*" val filesDir2 = "file:///home/hduser/data/spark/files2/*" val filePath = "file:///home/hduser/data/spark/files2/text01.txt" // 从多个目录读取文件到单个RDD val rdd4 = spark.sparkContext.textFile(s"$filesDir1,$filesDir2,$filePath") rdd4.collect.foreach(println)
执行以上代码,输出内容如下:
good good study day day up to be or not to be,this is a question. invalid,I one,1 two,2 three,3 four,4 one,1
从嵌套目录中读取文本文件到单个RDD
textFile()和wholeTextFile()在找到一个嵌套文件夹时返回一个错误,因此,首先使用scala、Java或Python语言通过遍历所有嵌套文件夹创建一个文件路径列表,并通过逗号分隔符传递所有文件名以创建单个RDD。
单独读取所有文本文件并合并以创建单个RDD
还可以将所有文本文件分别读入各自单独的RDD,然后将所有这些RDD合并为一个RDD。
读取多个CSV文件
Spark RDD没有读取csv文件格式的方法,因此我们将使用textFile()方法将csv文件像任何其他文本文件一样读取到RDD中,并基于逗号、管道或任何其他分隔符分割记录。
// 文件目录
val filesDir = "file:///home/hduser/data/spark/files2/*"
// 从多个目录读取文件到单个RDD
val rdd5 = spark.sparkContext.textFile(filesDir)
// 使用分隔符分割
val rdd6 = rdd5.map(row => row.split(","))
rdd6.collect.foreach(r => println(r(0) + "," + r(1)))
执行以上代码,输出内容如下:
invalid,I one,1 two,2 three,3 four,4