PySpark SQL日期时间处理函数
为了帮助执行复杂的分析,PySpark SQL提供了一组强大而灵活的聚合函数、连接多个数据集的函数、一组内置的高性能函数和一组高级分析函数。
PySpark内置的日期时间函数大致可分为以下三个类别:
- 执行日期时间格式转换的函数。
- 执行日期时间计算的函数。
- 从日期时间戳中提取特定值(如年、月、日等)的函数。
日期和时间转换函数有助于将字符串转换为日期、时间戳或Unix时间戳,反之亦然。在内部,它使用Java日期格式模式语法。这些函数使用的默认的日期格式是yyyy-mm-dd HH:mm:ss。因此,如果日期或时间戳列的日期格式不同,那么需要向这些转换函数传入指定的模式。
1. 将字符串转换为日期或时间戳
例如,将字符串类型的日期和时间戳转换为PySpark SQL的date和timestamp类型,代码如下:
from pyspark.sql import SparkSession from pyspark.sql.functions import * spark = SparkSession.builder \ .master("spark://localhost:7077") \ .appName("pyspark demo") \ .getOrCreate() # 1)日期和时间转换函数:这些函数使用的默认的日期格式是yyyy-mm-dd HH:mm:ss # 构造一个简单的DataFrame,注意最后两列不遵循默认日期格式 testDate = [(1, "2019-01-01", "2019-01-01 15:04:58", "01-01-2019", "12-05-2018 45:50")] testDateTSDF = spark.createDataFrame(testDate,schema=["id", "date", "timestamp", "date_str", "ts_str"]) # testDateTSDF.printSchema() # testDateTSDF.show() # 将这些字符串转换为date、timestamp和 unix timestamp,并指定一个自定义的date和timestamp 格式 testDateResultDF = testDateTSDF.select( to_date('date').alias("date1"), to_timestamp('timestamp').alias("ts1"), to_date('date_str',"MM-dd-yyyy").alias("date2"), to_timestamp('ts_str',"MM-dd-yyyy mm:ss").alias("ts2"), unix_timestamp('timestamp').alias("unix_ts") ) testDateResultDF.printSchema() testDateResultDF.show(truncate=False)
执行以上代码,输出结果如下:
root |-- date1: date (nullable = true) |-- ts1: timestamp (nullable = true) |-- date2: date (nullable = true) |-- ts2: timestamp (nullable = true) |-- unix_ts: long (nullable = true) +----------+-------------------+----------+-------------------+----------+ |date1 |ts1 |date2 |ts2 |unix_ts | +----------+-------------------+----------+-------------------+----------+ |2019-01-01|2019-01-01 15:04:58|2019-01-01|2018-12-05 00:45:50|1546326298| +----------+-------------------+----------+-------------------+----------+
2. 将日期或时间戳转换为字符串
将日期或时间戳转换为时间字符串是很容易的,方法是使用date_format()函数和定制日期格式,或者使用from_unixtime()函数将Unix时间戳(以秒为单位)转换成字符串。请看日期和时间戳转换为格式字符串的转换例子,代码如下:
from pyspark.sql.functions import * testDateResultDF.select( date_format('date1', "dd-MM-yyyy").alias("date_str"), date_format('ts1', "dd-MM-yyyy HH:mm:ss").alias("ts_str"), from_unixtime('unix_ts',"dd-MM-yyyy HH:mm:ss").alias("unix_ts_str") ).show()
执行以上代码,输出结果如下:
+----------+-------------------+-------------------+ | date_str| ts_str| unix_ts_str| +----------+-------------------+-------------------+ |01-01-2019|01-01-2019 15:04:58|01-01-2019 15:04:58| +----------+-------------------+-------------------+
3. 日期计算函数
日期-时间计算函数有助于计算两个日期或时间戳的相隔时间,以及执行日期或时间算术运算。关于日期-时间计算的示例,代码如下:
from pyspark.sql.functions import * # 2) 日期-时间(date-time)计算函数 data = [("黄渤", "2016-01-01", "2017-10-15"), ("王宝强", "2017-02-06", "2017-12-25")] employeeData = spark.createDataFrame(data, schema=["name", "join_date", "leave_date"]) employeeData.show()
执行以上代码,输出内容如下:
+------+----------+----------+ | name| join_date|leave_date| +------+----------+----------+ | 黄渤|2016-01-01|2017-10-15| | 王宝强|2017-02-06|2017-12-25| +------+----------+----------+
执行date()和month()计算,代码如下:
from pyspark.sql.functions import * employeeData.select( 'name', datediff('leave_date', 'join_date').alias("days"), months_between('leave_date', 'join_date').alias("months"), last_day('leave_date').alias("last_day_of_mon") ).show()
执行以上代码,输出内容如下:
+------+----+-----------+---------------+ | name|days| months|last_day_of_mon| +------+----+-----------+---------------+ | 黄渤| 653| 21.4516129| 2017-10-31| | 王宝强| 322|10.61290323| 2017-12-31| +------+----+-----------+---------------+
执行日期加、减计算,代码如下:
from pyspark.sql.functions import * oneDate = spark.createDataFrame([("2019-01-01",)],schema=["new_year"]) oneDate.select( date_add('new_year', 14).alias("mid_month"), date_sub('new_year', 1).alias("new_year_eve"), next_day('new_year', "Mon").alias("next_mon") ).show()
执行上面的代码,输出内容如下:
+----------+------------+----------+ | mid_month|new_year_eve| next_mon| +----------+------------+----------+ |2019-01-15| 2018-12-31|2019-01-07| +----------+------------+----------+
4. 转换不规范的日期
有的时候,采集到的数据是不受控制的,得到的日期可能是不规范的,这就需要将这些不规范的日期转换为规范的表示。对不规范日期的转换,代码如下:
from pyspark.sql.functions import * # 转换不规范的日期: data = [("Nov 05, 2018 02:46:47 AM",),("Nov 5, 2018 02:46:47 PM",)] df = spark.createDataFrame(data,schema=["times"]) df.withColumn( "times2", from_unixtime( unix_timestamp("times", "MMM d, yyyy hh:mm:ss a"), "yyyy-MM-dd HH:mm:ss.SSSSSS" ) ).show(truncate=False)
执行以上代码,输出结果如下:
+------------------------+--------------------------+ |times |times2 | +------------------------+--------------------------+ |Nov 05, 2018 02:46:47 AM|2018-11-05 02:46:47.000000| |Nov 5, 2018 02:46:47 PM |2018-11-05 14:46:47.000000| +------------------------+--------------------------+
5. 处理时间序列数据
在处理时间序列数据(time-series data)时,经常需要提取日期或时间戳值的特定字段(如年、月、小时、分钟和秒)。例如,当需要按季度、月或周对所有股票交易进行分组时,就可以从交易日期提取该信息,并按这些值分组。从日期或时间戳中提取字段,代码如下:
from pyspark.sql.functions import * # 3)提取日期或时间戳值的特定字段(如年、月、小时、分钟和秒) # 从一个日期值中提取指定的日期字段 valentimeDateDF = spark \ .createDataFrame([("2019-02-14 13:14:52",)],["date"]) valentimeDateDF.select( year('date').alias("year"), # 年 quarter('date').alias("quarter"), # 季 month('date').alias("month"), # 月 weekofyear('date').alias("woy"), # 周 dayofmonth('date').alias("dom"), # 日 dayofyear('date').alias("doy"), # 天 hour('date').alias("hour"), # 小时 minute('date').alias("minute"), # 分 second('date').alias("second") # 秒 ).show()
执行以上代码,输出结果如下:
+----+-------+-----+---+---+---+----+------+------+ |year|quarter|month|woy|dom|doy|hour|minute|second| +----+-------+-----+---+---+---+----+------+------+ |2019| 1| 2| 7| 14| 45| 13| 14| 52| +----+-------+-----+---+---+---+----+------+------+