使用共享变量-累加器

累加器(Accumulators)是只能add的跨executors之间共享的变量。可以使用它们来实现spark job中的全局求和与计数。

可以通过调用SparkContext.longAccumulator()或SparkContext.doubleAccumulator()来创建数值累加器,以分别累积Long或Double类型的值。然后,可以使用add方法将运行在集群上的任务添加到集群中。但是,他们无法读取它的值。只有驱动程序可以读取累加器的值,使用它的value方法。下面是使用累加器的示例:

val acc = sc.longAccumulator("My Accumulator")
acc.value      // 0

val list = sc.parallelize(Array(1, 2, 3, 4))

// 在executors上执行
list.foreach(x => acc.add(x))

// 在driver上执行
acc.value      // 10

在Spark的执行模型中,只有当计算被触发(例如,由一个action)时,Spark才会添加累加器。

累加器应用示例

【示例】数据清洗示例。将下面数据集中任意关键字段为空的条目剔除,并以打印语句输出删除条目数;关键字段定义为{星级}。

数据集样本如下:

SEQ,酒店,国家,省份,城市,商圈,星级,业务部门,房间数,图片数,评分,评论数
aba_2066,马尔康嘉绒大酒店,中国,四川,阿坝,,四星级/高档,OTA,85,,4.143799782,108
aba_2069,阿坝马尔康县澜峰大酒店,中国,四川,阿坝,,,低星,115,,3.977930069,129
aba_2094,阿坝鑫鸿大酒店,中国,四川,阿坝,四姑娘山,二星及其他,低星,,,,
aba_2096,九寨沟管理局荷叶迎宾馆,中国,四川,阿坝,九寨沟沟口,二星及其他,低星,49,,3.972340107,394
aba_2097,九寨沟风景名胜区管理局贵宾楼饭店,中国,四川,阿坝,九寨沟沟口,三星级/舒适,低星,50,,4.12789011,585
aba_2098,九寨沟九鑫山庄,中国,四川,阿坝,九寨沟沟口,四星级/高档,OTA,60,,4.04046011,161
aba_2102,九寨沟冈拉美朵酒店,中国,四川,阿坝,九寨沟沟口,四星级/高档,OTA,198,,3.471659899,12
aba_2109,若尔盖大藏酒店古格王朝店,中国,四川,阿坝,西部旅游牧场,五星级/豪华,OTA,94,,3.263220072,62
aba_2111,若尔盖大藏酒店圣地店,中国,四川,阿坝,西部旅游牧场,四星级/高档,OTA,188,,3.921580076,119
aba_2117,九寨沟保利新九寨宾馆,中国,四川,阿坝,漳扎镇,五星级/豪华,OTA,329,,4.353809834,269
aba_2134,九寨沟名人酒店,中国,四川,阿坝,漳扎镇,三星级/舒适,低星,292,,4.539999962,57
aba_2150,九寨沟仁智度假酒店,中国,四川,阿坝,九寨沟沟口,三星级/舒适,低星,137,,3.782749891,173
aba_2152,九寨沟药泉山庄,中国,四川,阿坝,黄龙机场、川主寺,四星级/高档,OTA,128,,3.821099997,310
aba_2156,松潘黄龙寺华龙山庄,中国,四川,阿坝,黄龙风景区,四星级/高档,OTA,154,,3.315500021,107
aba_2213,阿坝山之旅背包客栈,中国,四川,阿坝,四姑娘山,二星及其他,客栈,20,,,13
aba_2217,阿坝若尔盖大酒店,中国,四川,阿坝,,二星及其他,低星,71,,4.267769814,2
aba_2233,九寨沟云天海大酒店,中国,四川,阿坝,九寨沟沟口,三星级/舒适,低星,100,,2.783930063,
aba_2243,阿坝九旅假日酒店,中国,四川,阿坝,九寨沟沟口,四星级/高档,OTA,140,,3.00515008,49
aba_2248,九寨沟川主寺岷江源大酒店,中国,四川,阿坝,黄龙机场、川主寺,,低星,228,,3.211859941,109

实现代码如下:

import org.apache.spark.sql.SparkSession

object AccDemo2 {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder().master("local[*]").appName("accumulator").getOrCreate()

    // 定义计数器
    val starCounter = spark.sparkContext.longAccumulator("star_counter")

    // 加载外部数据源,构造RDD
    val rdd1 = spark.sparkContext.textFile("input/hotel/sample2.csv")

    // 过滤掉标题行
    val rdd2 = rdd1.filter(line => !line.startsWith("SEQ"))

    // 将关键字段有缺失值的记录删除:即将字段{星级、评论数、评分}中任意字段为空的数据删除
    // 并打印输出删除条目数 - 使用计数器统计
    val rdd3 = rdd2.map(_.split(",", -1))
      .filter(arr => {
        // 如果"星级"字段为空
        if(arr(6)==null || arr(6).trim.isEmpty){
          starCounter.add(1)    // 全局计数器 + 1
          false
        }else{
          true
        }
      })

    // 显示
    println(s"过滤前记录数${rdd2.count},过滤后记录后${rdd3.count}")
    println(s"删除的星级字段缺失的记录数是:${starCounter.value}")
  }
}

执行以上代码,输出结果如下:

过滤前记录数19,过滤后记录后17
删除的星级字段缺失的记录数是:2

如果我们把关键字段定义为{星级、评分、评论数},并分别统计每个字段的缺失值,则实现如下:

import org.apache.spark.sql.SparkSession

object AccDemo3 {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder().master("local[*]").appName("accumulator").getOrCreate()

    // 定义计数器
    val starCounter = spark.sparkContext.longAccumulator("star_counter")
    val scoreCounter = spark.sparkContext.longAccumulator("score_counter")
    val commCounter = spark.sparkContext.longAccumulator("comm_counter")

    // 加载外部数据源,构造RDD
    val rdd1 = spark.sparkContext.textFile("input/hotel/sample2.csv")

    // 过滤掉标题行
    val rdd2 = rdd1.filter(line => !line.startsWith("SEQ"))

    // 将关键字段有缺失值的记录删除:即将字段{星级、评论数、评分}中任意字段为空的数据删除
    // 并打印输出删除条目数 - 使用计数器统计
    val rdd3 = rdd2.map(_.split(",",-1))
      .filter(arr => {
        var flag = true       // 定义标志变量
        // 如果"星级"字段为空
        if(arr(6)==null || arr(6).trim.isEmpty){
          starCounter.add(1)    // 全局计数器 + 1
          flag = false
        }

        if(arr(10)==null || arr(10).trim.isEmpty){
          scoreCounter.add(1)    // 全局计数器 + 1
          flag = false
        }

        if(arr(11)==null || arr(11).trim.isEmpty){
          commCounter.add(1)    // 全局计数器 + 1
          flag = false
        }

        flag
      })

    // 显示
    println(s"过滤前记录数${rdd2.count},过滤后记录后${rdd3.count}")
    println(s"删除的星级字段缺失的记录数是:${starCounter.value}")
    println(s"删除的评分字段缺失的记录数是:${scoreCounter.value}")
    println(s"删除的星级字段缺失的记录数是:${commCounter.value}")
    
    // 存储清洗结果
    rdd3.map(arr => arr.mkString(",")).coalesce(1).saveAsTextFile("output/hotel")
  }
}

执行以上代码,输出结果如下:

过滤前记录数19,过滤后记录后14
删除的星级字段缺失的记录数是:2
删除的评分字段缺失的记录数是:2
删除的星级字段缺失的记录数是:2

《Flink原理深入与编程实战》