PySpark SQL用户自定义函数(UDF)
尽管PySpark SQL为大多数常见用例提供了大量的内置函数,但总会有一些情况下,这些功能都不能提供用户的用例所需要的功能。PySpark SQL提供了一个相当简单的工具来编写用户定义的函数(UDF),并在PySpark数据处理逻辑或应用程序中使用它们,就像使用内置函数一样。
UDF用于扩展框架的函数,并在多个DataFrame上重用这些函数。UDF实际上是用户可以扩展PySpark的功能以满足特定需求的一种方式。
在PySpark中,使用UDF涉及有三个步骤:
- (1) 第一步是用Python语法创建一个函数并进行测试。
- (2) 第二步是通过将函数名传递给PySpark SQL的udf()函数来注册它。
- (3) 第三步是在DataFrame代码或发出SQL查询时使用UDF。在SQL查询中使用UDF时,注册过程略有不同。
示例1
【示例】下面的示例用一个简单的UDF将数字等级转换为考查等级,它演示了前面提到的三个步骤。
首先创建一个包含学生成绩的DataFrame,代码如下:
from pyspark.sql import SparkSession from pyspark.sql.functions import * spark = SparkSession.builder \ .master("spark://localhost:7077") \ .appName("pyspark sql demo") \ .getOrCreate() # 创建学生成绩DataFrame studentDF = spark.createDataFrame( [ ("张三", 85), ("李四", 90), ("王老五", 55) ],["name","score"] ) studentDF.printSchema() studentDF.show()
执行以上代码,输出内容如下:
root |-- name: string (nullable = true) |-- score: integer (nullable = false) +------+-----+ | name|score| +------+-----+ | 张三| 85| | 李四| 90| | 王老五| 55| +------+-----+
将studentDF注册到名为students的临时视图,代码如下:
# 注册为视图 studentDF.createOrReplaceTempView("students") # spark.sql("select * from students").show()
接下来创建一个普通的Python函数,用来将成绩转换到考察等级,代码如下:
# 创建一个函数(普通的Python函数)将成绩转换到考察等级 def convertGrade(score): if score > 100: return "作弊" elif score >= 90: return "优秀" elif score >= 80: return "良好" elif score >= 70: return "中等" else: return "不及格" # 注册为一个UDF(在DataFrame API中使用时的注册方法) convertGradeUDF = udf(convertGrade) # 使用该UDF将成绩转换为字母等级 studentDF.select("name","score", convertGradeUDF(col("score")).alias("grade")).show()
最后,可以像使用普通PySpark内置函数一个使用该UDF,将成绩转换为字母等级,代码如下:
# 使用该UDF将成绩转换为字母等级 studentDF \ .select("name","score",convertGradeUDF(col("score")).alias("grade")) \ .show()
执行以上代码,输出结果如下:
+------+-----+------+ | name|score| grade| +------+-----+------+ | 张三| 85| 良好| | 李四| 90| 优秀| | 王老五| 55| 不及格| +------+-----+------+
当在SQL查询中使用UDF时,注册过程与上面略有不同,代码如下:
# 注册为UDF,在SQL中使用 spark.udf.register("convertGrade", convertGrade) spark.sql(""" select name, score, convertGrade(score) as grade from students""" ).show()
执行以上代码,输出结果如下:
+------+-----+------+ | name|score| grade| +------+-----+------+ | 张三| 85| 良好| | 李四| 90| 优秀| | 王老五| 55|不及格| +------+-----+------+
示例2
【示例】把DataFrame中名字字符串中每个单词的第一个字母都转换成大写字母。
首先,创建一个DataFrame,代码如下:
from pyspark.sql import SparkSession from pyspark.sql.functions import * spark = SparkSession.builder \ .master("spark://localhost:7077") \ .appName("pyspark sql demo") \ .getOrCreate() columns = ["Seqno","Name"] data = [ ("1", "john jones"), ("2", "tracey smith"), ("3", "amy sanders") ] df = spark.createDataFrame(data=data,schema=columns) df.show(truncate=False)
执行以上代码,输出结果如下:
+-----+------------+ |Seqno|Name | +-----+------------+ |1 |john jones | |2 |tracey smith| |3 |amy sanders | +-----+------------+
接下来,创建一个普通的Python函数,它接受一个字符串参数并将每个单词的第一个字母转换为大写字母,代码如下:
def convertCase(str): resStr="" arr = str.split(" ") for x in arr: resStr= resStr + x[0:1].upper() + x[1:len(x)] + " " return resStr
然后通过将函数传递给PySpark SQL的pyspark.sql.functions.udf()这个函数,将函数convertCase()注册为UDF,代码如下:
convertUDF = udf(lambda z: convertCase(z), StringType())
因为udf()函数的默认类型就是StringType,因此,也可以编写不带返回类型的上述语句,代码如下:
convertUDF = udf(lambda z: convertCase(z))
现在可以在DataFrame列上将convertUDF()作为常规内置函数来使用,代码如下:
df.select(col("Seqno"), \ convertUDF(col("Name")).alias("Name") ) \ .show(truncate=False)
执行以上代码,输出结果如下:
+-----+-------------+ |Seqno|Name | +-----+-------------+ |1 |John Jones | |2 |Tracey Smith | |3 |Amy Sanders | +-----+-------------+
也可以在DataFrame的withColumn()函数上使用udf()函数。下面创建另一个upperCase()函数,它将输入字符串转换为大写,代码如下:
def upperCase(str): return str.upper()
将upperCase()这个Python函数转换为UDF,然后将其与DataFrame withColumn()一起使用。下面的例子将Name列的值转换为大写,并创建一个新列Curated Name,代码如下:
upperCaseUDF = udf(lambda z:upperCase(z),StringType()) df.withColumn("Cureated Name", upperCaseUDF(col("Name"))).show(truncate=False)
执行以上代码,输出结果如下:
+-----+------------+-------------+ |Seqno|Name | Cureated Name| +-----+------------+-------------+ |1 |john jones |JOHN JONES | |2 |tracey smith|TRACEY SMITH | |3 |amy sanders |AMY SANDERS | +-----+------------+-------------+
为了在PySpark SQL上使用convertCase()函数,需要使用spark.udf.register()在PySpark上注册这个函数,代码如下:
# 注册函数 spark.udf.register("convertUDF", convertCase,StringType()) # 创建临时视图 df.createOrReplaceTempView("NAME_TABLE") # 执行SQL查询,在SQL语句中使用自定义函数 spark.sql("select Seqno, convertUDF(Name) as Name from NAME_TABLE").show(truncate=False)
执行以上代码,输出结果如下:
+-----+-------------+ |Seqno|Name | +-----+-------------+ |1 |John Jones | |2 |Tracey Smith | |3 |Amy Sanders | +-----+-------------+
前面要创建UDF,需要两步处理:先创建一个Python函数,再将该函数注册为UDF。用户也可以通过注解来创建UDF,只需一步,代码如下:
@udf(returnType=StringType()) def upperCase(str): return str.upper() df.withColumn("Cureated Name", upperCase(col("Name"))).show(truncate=False)
执行以上代码,输出结果如下:
+-----+------------+-------------+ |Seqno|Name |Cureated Name| +-----+------------+-------------+ |1 |john jones |JOHN JONES | |2 |tracey smith|TRACEY SMITH | |3 |amy sanders |AMY SANDERS | +-----+------------+-------------+