发布日期:2023-06-13 VIP内容

示例:使用Spark查找中位数和分位数

Spark中的中位数和分位数计算都可以使用DataFrame API或Spark SQL来执行。可以使用内置函数,如approxQuantile、percentile_approx、sort和selectExpr来执行这些计算。

在本教程中,我们将通过一些示例来学习如何使用Spark查找中位数和分位数。

下面我们创建一个包含产品销售信息的样例DataFrame,并尝试使用它计算销售额的中位数和分位数。

1、创建样例DataFrame

创建一个示例DataFrame,它包含两列: Product 和 Price。这个数据集表示销售信息,其中每行包含产品名称及其相应的价格。

// 导入依赖
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._

// 创建DataFrame
val data = Seq(
  ("Product A", 100.0),
  ("Product B", 150.0),
  ("Product C", 200.0),
  ("Product D", 125.0),
  ("Product E", 180.0),
  ("Product F", 300.0),
  ("Product G", 220.0),
  ("Product H", 170.0),
  ("Product I", 240.0),
  ("Product J", 185.0)
)


val df = spark.createDataFrame(data).toDF("Product", "Price")

df.printSchema()

df.show()

执行以上代码,输出内容如下:

root
 |-- Product: string (nullable = true)
 |-- Price: double (nullable = false)

+---------+-----+
|  Product|Price|
+---------+-----+
|Product A|100.0|
|Product B|150.0|
|Product C|200.0|
|Product D|125.0|
|Product E|180.0|
|Product F|300.0|
|Product G|220.0|
|Product H|170.0|
|Product I|240.0|
|Product J|185.0|
+---------+-----+

2、计算中位数

在Spark中,中位数是一种统计度量,用于查找数据集的中间值。它表示将数据的上半部分与下半部分分开的值。

Spark提供了各种统计函数,包括approxQuantile()和median(),可以用来近似中位数。approxQuantile()函数使用给定的分位数概率列表计算一个DataFrame列的分位数。通过指定0.5的概率,可以获得近似中位数。

2.1 使用median()函数

在Spark中,计算中位数需要对数据集进行排序并找到中间值。可以对DataFrame进行排序并提取中间行以获得中值。

从Spark 3.4.0开始,org.apache.spark.sql.functions包中新增加了一个median()函数。它是一个聚合函数,用于返回一组值(例如,Column)的中位数。

median()函数的定义如下:

def median(e: Column): Column

    Aggregate function: returns the median of the values in a group.

    Since 3.4.0

使用median()函数计算中位数的代码如下:

// 计算中位数
val medianValue = df.select(median(col("Price"))).first().getDouble(0)

// 输出结果
println(s"Price列的中位数是: $medianValue")

在上面的代码中:

  • 创建了一个示例DataFrame,它包含两列: Product和Price。它表示销售信息,其中每行包含产品名称及其相应的价格。
  • 然后在Price列上应用median()函数,结果是一个包含中位数值的单行DataFrame。
  • 最后,使用first()函数检索这个中位数值,并使用getDouble(0)作为Double访问。

执行以上代码,输出结果如下:

Price列的中位数是: 182.5

2.2 使用approxQuantile()函数

Spark提供了各种统计函数,包括approxQuantile(),它可以用来近似中位数。approxQuantile()函数使用给定的分位数概率列表计算DataFrame列的分位数。通过指定0.5的概率,可以近似中位数。

// 计算分位数
val quantileProbabilities = Array(0.5)
val quantiles = df.stat.approxQuantile("Price", quantileProbabilities, 0.01)

val median = quantiles(0)

// 输出结果
println(s"Price列的中位数是: $median")

在上面的代码中:

  • 在Price列上调用approxQuantile()函数,并使用包含所需分位数概率(在本例中中位数为0.5)和相对误差参数(在本例中为0.01)的数组。
  • 生成的结果数组将包含近似的分位数值,可以通过根据所需的百分位数对数组进行索引来访问它们。

执行以上代码,输出结果如下:

Price列的中位数是: 180.0

3、计算分位数

在Spark中,分位数是将数据集划分为大小相等的区间的统计度量。它们提供有关数据分布的信息,并帮助确定特定百分位数的值。

Spark提供了一个名为approxQuantile()的函数,用于计算给定DataFrame列的分位数。这个函数使用一组概率和一个可选的相对误差参数来近似分位数。

在下面的示例代码中,使用approxQuantile()计算各分位数:

// 计算各分位数
val quantileProbabilities = Array(0.25, 0.5, 0.75)
val quantiles = df.stat.approxQuantile("Price", quantileProbabilities, 0.01)

val quantile25th = quantiles(0)
val median = quantiles(1)
val quantile75th = quantiles(2)

// 输出结果
println(s"Price的第一四分位数是: $quantile25th")
println(s"Price的第二四分位数(中位数)是: $median")
println(s"Price的第三四分位数是: $quantile75th")

在上面的示例代码中:

  • 在Price列上调用approxQuantile()函数,并使用包含所需分位数概率(在本例中分别是0.25、0.5和0.75)和相对误差参数(在本例中为0.01)的数组。
  • 生成的结果数组将包含中位数价格、第一四分位数价格和第三四分位数价格。

执行以上代码,输出内容如下:

Price的第一四分位数是: 150.0
Price的第二四分位数(中位数)是: 180.0
Price的第三四分位数是: 220.0

注意,approxQuantile()函数提供了分位数的近似值,近似值的级别可以通过调整相对误差参数来控制。如果需要精确的分位数值,可能需要探索特定于自己的用例的自定义方法或替代库。

4、完整代码

最后,完整的代码如下:

// 导入依赖
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._

// 创建一个SparkSession
val spark = SparkSession.builder()
  .appName("Spark Demo")
  .master("local")
  .getOrCreate()

// 创建一个包含销售信息的样例DataFrame
val data = Seq(
  ("Product A", 100.0),
  ("Product B", 150.0),
  ("Product C", 200.0),
  ("Product D", 125.0),
  ("Product E", 180.0),
  ("Product F", 300.0),
  ("Product G", 220.0),
  ("Product H", 170.0),
  ("Product I", 240.0),
  ("Product J", 185.0)
)

val df = spark.createDataFrame(data).toDF("Product", "Price")

// 计算中位数
val medianValue = df.select(median(col("Price"))).first().getDouble(0)

// 计算四分位数
val quantileProbabilities = Array(0.25, 0.5, 0.75)
val quantiles = df.stat.approxQuantile("Price", quantileProbabilities, 0.01)

val quantile25th = quantiles(0)
val median = quantiles(1)
val quantile75th = quantiles(2)

// 输出结果
println(s"Price列的中位数是: $medianValue")
println(s"Price的第一四分位数是: $quantile25th")
println(s"Price的第二四分位数(中位数)是: $median")
println(s"Price的第三四分位数是: $quantile75th")