发布日期:2023-04-06 VIP内容

示例_数据整合、清洗与转换

原始数据通常是混乱的,需要进行一系列转换才能用于建模和分析工作。这样的数据集可能有丢失的数据、重复的记录、损坏的数据、不完整的记录等等。而数据清理,是把原始数据转换成可用的格式。在大多数项目中,这是最具挑战性和最耗时的一步。

数据清理阶段是一个非常重要的阶段,不仅对于算法来说是正确的,而且还可以让我们更好地理解我们的数据,这样我们就可以在实现算法的同时采取正确的方法。数据处理是执行机器所必需的关键步骤。我们需要对数据进行清洗、筛选、合并和转换,以将其转换为所需的形式,从而能够训练机器学习模型。

下面将通过示例演示在Spark中如何实现:

数据整合

一旦数据从不同的来源获得,接一来就是将它们全部合并,以便将数据作为一个整体进行清理、格式化,并转换为分析所需的格式。

在本节中,我们将讨论如何组合从各种数据源获得的数据。为了更好地理解数据获取和准备阶段,这里我们假设这样的场景:员工数据分散存储在本地的RDD、JSON文件和关系型数据库中。

1、数据加载:下面演示了从不同数据源(内存集合、文件等)加载数据到Dataset中。

// 创建一个RDD并转换为DataFrame
val employeesDF = sc.parallelize(List((1, "陈柯宇", 25), (2, "陶心瑶", 35),(3, "楼一萱", 24), 
                                    (4, "张希", 28), (5, "王心凌", 26), (6, "庄妮", 35), 
                                    (7,"何洁", 38), (8, "成方圆", 32), (9, "孙玉", 29), 
                                    (10, "刘珂矣", 29),(11, "林忆莲", 28), (12, "蓝琪儿", 25), 
                                    (13, "白安",31))).toDF("emp_id","name","age")
employeesDF.show()

// 加载json格式的数据源文件
val salaryFilePath = "/data/spark_demo/data_analyze/salary.json"
val salaryDF = spark.read.json(salaryFilePath)
salaryDF.show()

val designationFilePath = "/data/spark_demo/data_analyze/designation.json"
val designationDF = spark.read.json(designationFilePath)
designationDF.show()

执行以上代码,输出如下:

2、数据整合:组合从各种数据源获得的数据。

val final_data = employeesDF.join(salaryDF,$"emp_id"===$"e_id").
                             join(designationDF,$"emp_id"===$"id").
                             select("emp_id","name","age","role","salary")
final_data.cache
final_data.show

在集成了来自这些数据源的数据之后,最终的数据集(在本例中是final_data)应该是以下格式:

数据清洗

一旦把数据整合到一起,就必须花时间和精力去整理它,然后再分析它。数据可能有各种各样的问题,这里我们将处理一些常见的情况,比如缺失值、重复值、转换或格式化(从数字中添加或删除数字,将一个列分割成两个,合并两个列到一个)。

1、缺失值处理

处理缺失值有很多种方法。一种方法是删除包含缺失值的行。有可能是某一行仅某个列有缺失值,我们就要把该行删除。或者对于不同的列可能有不同的策略,比如,只要该行缺失值的总数在一个阈值之下,我们就保留这一行。还有一种方法可能是用一个常量值替换null值,比如数值变量的平均值。

所以总结来说,缺失值的处理有三种方式:

  • 删除:df.na.drop(),等价于df.na.drop("any")。如果是df.na.drop("all"),则意思是删除所有列都有null值的行。另外也可以指定列名,例如,df.na.drop(Array("列名"))。
  • 填充:使用fill函数,将null或NaN值替换为一个常量。df.na.fill(Map("列名"->填充值))。
  • 替换:df.na.replace(Array("列名1","列名2"),Map(旧值 -> 新值))。

请看下面的示例:

// 1、缺失值处理
// 可以删除带有缺失值的行
var clean_data = final_data.na.drop()
clean_data.show

输出结果如下所示:

也可以使用平均值替换缺失值:

val mean_salary = final_data.select(floor(avg("salary"))).first()(0).toString.toDouble
clean_data = final_data.na.fill(Map("salary" -> mean_salary))
clean_data.show

输出结果如下所示:

2、异常值处理

异常值也称为离群值。简单地说,一个离群值是一个数据点,它与其他数据点没有相同的特征。此外,也可以有单变量的异常值,也可以有多变量的异常值。(这里主要讨论单变量异常值)

为了处理异常值,必须首先查看是否有异常值。发现和找到异常值的方法有多种,例如汇总统计和绘图技术。一旦找到了离群值,就可以删除包含离群值的行,或者使用平均值来替换异常值,或者根据自己的情况做一些更相关的事情。下面让我们看一下均值替代方法:

// 2、异常值处理:删除包含离群值的行,或者均值替代方法
// 识别异常值并用均值替代
// 计算每一行的偏差
val devs = clean_data.select((($"salary" - mean_salary) * ($"salary" - mean_salary)).alias("deviation"))

// 计算标准偏差
val stddev = devs.select(sqrt(avg("deviation"))).first().getDouble(0)

// 用平均工资替换超过2个标准差范围内的异常值(UDF)
val outlierfunc = udf((value: Long, mean: Double) => {
   if (value > mean+(2*stddev) || value < mean-(2*stddev)) 
       mean 
   else 
       value
})  
val no_outlier = clean_data.withColumn("updated_salary",outlierfunc(col("salary"),lit(mean_salary)))

// 观察修改后的值
no_outlier.filter($"salary" =!= $"updated_salary").show()

输出结果如下所示:

3、重复值处理

在数据集中处理重复记录有不同的方法。我们将在以下代码片段中演示这些方法:

// 3、重复值处理
// 删除重复的行
// val no_outlier_no_duplicates = no_outlier.dropDuplicates()
// no_outlier_no_duplicates.show

// 也可基于某列的子集删除重复的行
val test_df = no_outlier.dropDuplicates("role").show

输出结果如下所示:

数据转换

存在有各种各样的数据转换需求,这里我们将讨论一些基本类型的转换,如下所示:

  • 将两列合并成一列
  • 将字符/数字添加到现有的字符/数字
  • 从现有的字符/数字中删除或替换字符/数字
  • 更改日期的格式

下面的程序代码演示了如何对数据进行一些基本的转换:

// 合并列
// 创建一个udf来连接两个列值
val concatfunc = udf((name: String, age: Integer) => {name + "_" + age})

// 应用该udf来创建合并的列
val concat_df = final_data.withColumn("name_age",concatfunc($"name", $"age"))

// 显示 
concat_df.show

输出结果如下所示:

向数据添加常量:

// 注册一个UDF,功能是将年龄增加10岁
val addconst = udf((age:Integer) => {age + 10})

// 应用UDF
val data_new = concat_df.withColumn("age_incremented",addconst(col("age")))
data_new.show

输出结果如下所示:

替换一个列中的值:

final_data.na.replace("role",Map("合伙人" -> "同事")).show

输出结果如下所示:

如果在replace中列名参数是"*",那么替换应用到所有的列。

基于一个列中已经存在的值追加新的列:

concat_df.withColumn("age",addconst(col("age"))).show

输出结果如下所示:

现在我们已经熟悉了一些基本的例子,让我们来看一个比较复杂的例子。

在这里,我们看到的是日期列有许多不同日期格式的数据点的情况。我们需要将所有不同的日期格式标准化成一种格式。要做到这一点,我们首先必须创建一个用户定义的函数(udf),它可以处理不同的格式,并将它们转换为一种通用格式。

代码实现如下所示:

// 日期转换
// 构造数据集
case class Book (title: String, author: String, pubtime: String)
val books = Seq(Book("浮生六记","[清]沈复","2018/7/1"),
                  Book("云边有个小卖部","张嘉佳","2018/07/12"),
                  Book("菊与刀","[美]本尼迪克特",null),
                  Book("苏菲的世界","乔斯坦·贾德","2017,10 12"),
                  Book("罗生门",null,null)
                )

val ds1 = sc.parallelize(books).toDS()
  
// 定义udf: 将传入的字符串转换成YYYY-MM-DD格式
def toDateUDF = udf((s: String) => {
    var (year, month, day) = ("","","")

    // 格式化日期
    if(s != null) {
        var x = s.split(" |/|,")    // 拆分
        year = "%04d".format(x(0).toInt)
        month = "%02d".format(x(1).toInt)
        day = "%02d".format(x(2).toInt)

        year + "-" + month + "-" + day
    } else {
        null
    }
})

// 应用udf并将日期字符串转换为标准的形式YYYY-MM-DD
ds1.withColumn("pubtime",toDateUDF(ds1("pubtime"))).show

输出结果如下所示: