示例_数据整合、清洗与转换
原始数据通常是混乱的,需要进行一系列转换才能用于建模和分析工作。这样的数据集可能有丢失的数据、重复的记录、损坏的数据、不完整的记录等等。而数据清理,是把原始数据转换成可用的格式。在大多数项目中,这是最具挑战性和最耗时的一步。
数据清理阶段是一个非常重要的阶段,不仅对于算法来说是正确的,而且还可以让我们更好地理解我们的数据,这样我们就可以在实现算法的同时采取正确的方法。数据处理是执行机器所必需的关键步骤。我们需要对数据进行清洗、筛选、合并和转换,以将其转换为所需的形式,从而能够训练机器学习模型。
下面将通过示例演示在Spark中如何实现:
数据整合
一旦数据从不同的来源获得,接一来就是将它们全部合并,以便将数据作为一个整体进行清理、格式化,并转换为分析所需的格式。
在本节中,我们将讨论如何组合从各种数据源获得的数据。为了更好地理解数据获取和准备阶段,这里我们假设这样的场景:员工数据分散存储在本地的RDD、JSON文件和关系型数据库中。
1、数据加载:下面演示了从不同数据源(内存集合、文件等)加载数据到Dataset中。
// 创建一个RDD并转换为DataFrame
val employeesDF = sc.parallelize(List((1, "陈柯宇", 25), (2, "陶心瑶", 35),(3, "楼一萱", 24),
(4, "张希", 28), (5, "王心凌", 26), (6, "庄妮", 35),
(7,"何洁", 38), (8, "成方圆", 32), (9, "孙玉", 29),
(10, "刘珂矣", 29),(11, "林忆莲", 28), (12, "蓝琪儿", 25),
(13, "白安",31))).toDF("emp_id","name","age")
employeesDF.show()
// 加载json格式的数据源文件
val salaryFilePath = "/data/spark_demo/data_analyze/salary.json"
val salaryDF = spark.read.json(salaryFilePath)
salaryDF.show()
val designationFilePath = "/data/spark_demo/data_analyze/designation.json"
val designationDF = spark.read.json(designationFilePath)
designationDF.show()
执行以上代码,输出如下:
2、数据整合:组合从各种数据源获得的数据。
val final_data = employeesDF.join(salaryDF,$"emp_id"===$"e_id").
join(designationDF,$"emp_id"===$"id").
select("emp_id","name","age","role","salary")
final_data.cache
final_data.show
在集成了来自这些数据源的数据之后,最终的数据集(在本例中是final_data)应该是以下格式:
数据清洗
一旦把数据整合到一起,就必须花时间和精力去整理它,然后再分析它。数据可能有各种各样的问题,这里我们将处理一些常见的情况,比如缺失值、重复值、转换或格式化(从数字中添加或删除数字,将一个列分割成两个,合并两个列到一个)。
1、缺失值处理
处理缺失值有很多种方法。一种方法是删除包含缺失值的行。有可能是某一行仅某个列有缺失值,我们就要把该行删除。或者对于不同的列可能有不同的策略,比如,只要该行缺失值的总数在一个阈值之下,我们就保留这一行。还有一种方法可能是用一个常量值替换null值,比如数值变量的平均值。
所以总结来说,缺失值的处理有三种方式:
- 删除:df.na.drop(),等价于df.na.drop("any")。如果是df.na.drop("all"),则意思是删除所有列都有null值的行。另外也可以指定列名,例如,df.na.drop(Array("列名"))。
- 填充:使用fill函数,将null或NaN值替换为一个常量。df.na.fill(Map("列名"->填充值))。
- 替换:df.na.replace(Array("列名1","列名2"),Map(旧值 -> 新值))。
请看下面的示例:
// 1、缺失值处理 // 可以删除带有缺失值的行 var clean_data = final_data.na.drop() clean_data.show
输出结果如下所示:
也可以使用平均值替换缺失值:
val mean_salary = final_data.select(floor(avg("salary"))).first()(0).toString.toDouble
clean_data = final_data.na.fill(Map("salary" -> mean_salary))
clean_data.show
输出结果如下所示:
2、异常值处理
异常值也称为离群值。简单地说,一个离群值是一个数据点,它与其他数据点没有相同的特征。此外,也可以有单变量的异常值,也可以有多变量的异常值。(这里主要讨论单变量异常值)
为了处理异常值,必须首先查看是否有异常值。发现和找到异常值的方法有多种,例如汇总统计和绘图技术。一旦找到了离群值,就可以删除包含离群值的行,或者使用平均值来替换异常值,或者根据自己的情况做一些更相关的事情。下面让我们看一下均值替代方法:
// 2、异常值处理:删除包含离群值的行,或者均值替代方法
// 识别异常值并用均值替代
// 计算每一行的偏差
val devs = clean_data.select((($"salary" - mean_salary) * ($"salary" - mean_salary)).alias("deviation"))
// 计算标准偏差
val stddev = devs.select(sqrt(avg("deviation"))).first().getDouble(0)
// 用平均工资替换超过2个标准差范围内的异常值(UDF)
val outlierfunc = udf((value: Long, mean: Double) => {
if (value > mean+(2*stddev) || value < mean-(2*stddev))
mean
else
value
})
val no_outlier = clean_data.withColumn("updated_salary",outlierfunc(col("salary"),lit(mean_salary)))
// 观察修改后的值
no_outlier.filter($"salary" =!= $"updated_salary").show()
输出结果如下所示:
3、重复值处理
在数据集中处理重复记录有不同的方法。我们将在以下代码片段中演示这些方法:
// 3、重复值处理
// 删除重复的行
// val no_outlier_no_duplicates = no_outlier.dropDuplicates()
// no_outlier_no_duplicates.show
// 也可基于某列的子集删除重复的行
val test_df = no_outlier.dropDuplicates("role").show
输出结果如下所示:
数据转换
存在有各种各样的数据转换需求,这里我们将讨论一些基本类型的转换,如下所示:
- 将两列合并成一列
- 将字符/数字添加到现有的字符/数字
- 从现有的字符/数字中删除或替换字符/数字
- 更改日期的格式
下面的程序代码演示了如何对数据进行一些基本的转换:
// 合并列
// 创建一个udf来连接两个列值
val concatfunc = udf((name: String, age: Integer) => {name + "_" + age})
// 应用该udf来创建合并的列
val concat_df = final_data.withColumn("name_age",concatfunc($"name", $"age"))
// 显示
concat_df.show
输出结果如下所示:
向数据添加常量:
// 注册一个UDF,功能是将年龄增加10岁
val addconst = udf((age:Integer) => {age + 10})
// 应用UDF
val data_new = concat_df.withColumn("age_incremented",addconst(col("age")))
data_new.show
输出结果如下所示:
替换一个列中的值:
final_data.na.replace("role",Map("合伙人" -> "同事")).show
输出结果如下所示:
如果在replace中列名参数是"*",那么替换应用到所有的列。
基于一个列中已经存在的值追加新的列:
concat_df.withColumn("age",addconst(col("age"))).show
输出结果如下所示:
现在我们已经熟悉了一些基本的例子,让我们来看一个比较复杂的例子。
在这里,我们看到的是日期列有许多不同日期格式的数据点的情况。我们需要将所有不同的日期格式标准化成一种格式。要做到这一点,我们首先必须创建一个用户定义的函数(udf),它可以处理不同的格式,并将它们转换为一种通用格式。
代码实现如下所示:
// 日期转换
// 构造数据集
case class Book (title: String, author: String, pubtime: String)
val books = Seq(Book("浮生六记","[清]沈复","2018/7/1"),
Book("云边有个小卖部","张嘉佳","2018/07/12"),
Book("菊与刀","[美]本尼迪克特",null),
Book("苏菲的世界","乔斯坦·贾德","2017,10 12"),
Book("罗生门",null,null)
)
val ds1 = sc.parallelize(books).toDS()
// 定义udf: 将传入的字符串转换成YYYY-MM-DD格式
def toDateUDF = udf((s: String) => {
var (year, month, day) = ("","","")
// 格式化日期
if(s != null) {
var x = s.split(" |/|,") // 拆分
year = "%04d".format(x(0).toInt)
month = "%02d".format(x(1).toInt)
day = "%02d".format(x(2).toInt)
year + "-" + month + "-" + day
} else {
null
}
})
// 应用udf并将日期字符串转换为标准的形式YYYY-MM-DD
ds1.withColumn("pubtime",toDateUDF(ds1("pubtime"))).show
输出结果如下所示: