示例_数据缺失值处理

下面是一个简单的数据清洗的示例,演示了使用Spark SQL DataFrame API对数据集进行缺失值填充和删除的方法。

首先构造一个带缺失值的Dataset:

// case class
case class Author (name: String, dynasty: String, dob: String)

// 构造一个带有缺失值的数据集
val authors = Seq(Author("曹雪芹","清代","1724年"),
                Author("施耐庵","元末明初","1296年"),
                Author("罗贯中","元末明初",null),
                Author("吴承恩",null,null)
               )

val ds1 = sc.parallelize(authors).toDS()
ds1.show()

输出结果如下所示:

删除带有缺失值的行:

ds1.na.drop().show()

输出结果如下所示:

删除带有至少两个缺失值的行:

ds1.na.drop(minNonNulls=2).show()

输出结果如下所示:

注意,没有直接删除至少n个缺失值的行的scala函数,不过可以删除包含低于指定的非null值的行,从而取得相同的结果:

ds1.na.drop(minNonNulls = ds1.columns.length - 1).show()

输出结果如下所示:

使用一个给定的字符串填充所有缺失的值:

ds1.na.fill("不详").show()

输出结果如下所示:

也可以在每一列使用一个给定的字符串填充缺失值:

ds1.na.fill(Map("dynasty"->"--", "dob"->"不详")).show()

输出结果如下所示:

删除重复的值:

// 重用Author case class
// 构造另一个Dataset
val authors = Seq(Author("曹雪芹","清代","1724年"),
                  Author("曹雪芹","清代","1724年"),
                  Author("施耐庵","元末明初","1296年"),
                  Author("曹雪芹","清朝","1724年"),
                  Author("罗贯中","元末明初",null),
                  Author("吴承恩",null,null)
                )

val ds1 = sc.parallelize(authors).toDS()
ds1.show()

输出结果如下所示:

删除完全重复的行:

ds1.dropDuplicates().show()

输出结果如下所示:

删除指定的列重复的行(基于一个列的子集删除重复):

ds1.dropDuplicates("name").show()

输出结果如下所示:

删除指定的多个列重复的行(基于一个子集删除重复):

ds1.dropDuplicates(Array("dynasty","dob")).show()

输出结果如下所示:


《Flink原理深入与编程实战》