PySpark SQL字符串处理函数

毫无疑问,数据集的大多数列都是字符串类型的。PySpark SQL内置的字符串函数提供了操作字符串类型列的通用和强大的方法。一般来说,这些函数分为两类。

  • (1) 执行字符串转换的函数。
  • (2) 执行字符串提取(或替换)的函数,使用正则表达式。

1. 字符串去空格

最常见的字符串转换包括去空格、填充、大写转换、小写转换和字符串连接等。演示使用各种内置字符串函数转换字符串的各种方法,代码如下:

from pyspark.sql import SparkSession
from pyspark.sql.functions import *

spark = SparkSession.builder \
   .master("spark://localhost:7077") \
   .appName("pyspark demo") \
   .getOrCreate()

# 使用各种内置字符串函数转换字符串的各种方法。
sparkDF = spark.createDataFrame([(" Spark ",)], ["name"])

# 去空格
sparkDF.select(
      trim("name").alias("trim"),     	# 去掉"name"列两侧的空格
      ltrim("name").alias("ltrim"),   	# 去掉"name"列左侧的空格
      rtrim("name").alias("rtrim")    	# 去掉"name"列右侧的空格
    ).show()

执行上面的代码,输出结果如下:

+-----+------+------+
| trim| ltrim| rtrim|
+-----+------+------+
|Spark|Spark | Spark|
+-----+------+------+

2. 字符串填充

用给定的填充字符串将字符串填充到指定长度,代码如下:

from pyspark.sql.functions import *

# 首先去掉"Spark"前后的空格,然后填充到8个字符长
sparkDF \
    .select(trim("name").alias("trim")) \
    .select(
        lpad("trim", 8, "-").alias("lpad"),   # 宽度为8,不够的话,左侧填充"-"
        rpad("trim", 8, "=").alias("rpad")    # 宽度为8,不够的话,右侧填充"="
     ) \
    .show()

执行以上代码,输出结果如下:

+--------+--------+
|     lpad|    rpad|
+--------+--------+
|---Spark|Spark===|
+--------+--------+

3. 字符串转换

字符串之间可以使用转换函数进行多种转换,代码如下:

from pyspark.sql import SparkSession
from pyspark.sql.functions import *

spark = SparkSession.builder \
   .master("spark://localhost:7077") \
   .appName("pyspark demo") \
   .getOrCreate()

# 使用concatenation, uppercase, lowercase 和reverse转换一个字符串
sentenceDF = spark.createDataFrame([("Spark", "is", "excellent")], ["subject", "verb", "adj"])
sentenceDF \
      .select(concat_ws(" ","subject", "verb", "adj").alias("sentence")) \
      .select(
        lower("sentence").alias("lower"),        	# 转小写
        upper("sentence").alias("upper"),       	# 转大写
        initcap("sentence").alias("initcap"), 	# 转首字母大写
        reverse("sentence").alias("reverse")   	# 反转
      ) \
      .show()

# 从一个字符转换到另一个字符
sentenceDF \
      .select("subject", translate("subject", "pr", "oc").alias("translate")) \
      .show()

执行以上代码,输出结果如下:

+------------------+------------------+------------------+------------------+
|               lower|                upper|             initcap|           reverse|
+------------------+------------------+------------------+------------------+
|spark is excellent|SPARK IS EXCELLENT|Spark Is Excellent|tnellecxe si krapS|
+------------------+------------------+------------------+------------------+

+-------+---------+
|subject|translate|
+-------+---------+
|  Spark|    Soack|
+-------+---------+

4. 字符串提取

如果想从字符串列值中替换或提取子字符串,可以使用regexp_extract()和regexp_replace()函数。这两个函数通过传入的正则表达式模式来实现替换或提取。

使用regexp_extract函数(),代码如下:

from pyspark.sql.functions import *

# 使用regexp_extract字符串函数来提取"fox",使用一个模式
strDF = spark.createDataFrame([("A fox saw a crow sitting on a tree singing \"Caw! Caw! Caw!\"",)], ["comment"])

# 使用一个模式
strDF.select(regexp_extract("comment", "[a-z]*o[xw]",0).alias("substring")).show()

执行以上代码,输出结果如下所示。

+---------+
|substring|
+---------+
|       fox|
+---------+

5. 字符串替换

如果要替换字符串的某一部分子字符串,使用regexp_replace()函数。regexp_replace()字符串函数的输入参数是字符串列、匹配的模式、以及替换的值。使用regexp_replace()函数,代码如下:

from pyspark.sql.functions import *

# 用regexp_replace字符串函数将“fox”和“Caw”替换为“animal”
strDF = spark.createDataFrame([("A fox saw a crow sitting on a tree singing \"Caw! Caw! Caw!\"",)], ["comment"])

# 下面两行产生相同的输出
strDF.select(regexp_replace("comment","fox|crow","animal").alias("new_comment")).show(truncate=False)
strDF.select(regexp_replace("comment","[a-z]*o[xw]","animal").alias("new_comment")).show(truncate=False)

执行以上代码,输出结果如下:

+----------------------------------------------------------------+
|new_comm                                                                   |
+----------------------------------------------------------------+
|A animal saw a animal sitting on a tree singing "Caw! Caw! Caw!"|
+----------------------------------------------------------------+

+----------------------------------------------------------------+
|new_comm                                                                   |
+----------------------------------------------------------------+
|A animal saw a animal sitting on a tree singing "Caw! Caw! Caw!"|
+----------------------------------------------------------------+

例如,使用regexp_replace()函数从从混乱的数据中抽取出手机号,代码如下:

telDF = spark.createDataFrame([("135a-123b4-c5678",)], ["tel"])
telDF.withColumn("phone",regexp_replace('tel',"-|\\D","")).show()

执行以上代码,输出结果如下:

+----------------+-----------+
|               tel|        phone|
+----------------+-----------+
|135a-123b4-c5678|13512345678|
+----------------+-----------+

《PySpark原理深入与编程实战》