使用Python编写Hive UDF

Hive有一组丰富的函数可以用来执行分析。但是,有时可能会出现仅使用内置函数无法满足需求的情况。在这种情况下,用户需要实现自定义用户定义函数(UDF)并将其提供给Hive查询。

由于Hadoop框架是用Java编写的,大多数Hadoop开发人员自然更喜欢用Java编写UDF。然而,Apache也使非Java开发人员能够轻松地使用Hadoop,这是通过使用Hadoop Streaming接口完成的!

【示例】使用Python编写和使用UDF

1. 首先编写Python数据处理脚本:

nano employees.py

编辑代码如下:

import sys

for line in sys.stdin:
   line = line.strip()
   (emp_id,emp_name) = line.split('\t')
   print(emp_id + '\t' + emp_name + ',亲')

2. 创建Hive表:

-- 创建hive表
create table employees (
  emp_id 	int,
  emp_name 	string
) row format delimited fields terminated by '\t';

3. 加载数据文件到Hive表:

hive> load data local inpath '/home/hduser/data/hive/peoples.txt' overwrite into table employees;

4. 将自定义Python数据处理文件添加到路径中:

hive> add file /home/hduser/employees.py;

5. 在查询中应用transform函数

hive> select transform(emp_id, emp_name) using 'python employees.py' as (employee_id, emp_name) 
      from employees;

输出结果如下所示:

1       张三,亲
2       李四,亲
3       王老五,亲

【示例】使用Python UDF处理订单

假设有一张 Hive 表,描述订单基本信息:

CREATE TABLE IF NOT EXISTS order_base(
    id                 INT         COMMENT  '自增ID',      
    order_sn           STRING      COMMENT  '订单编号',
    user_id            INT         COMMENT  '用户ID',
    shop_id            INT         COMMENT  '店铺ID',
    add_time           INT         COMMENT  '下单时间',
    pay_time           INT         COMMENT  '付款时间',
    delivery_name      STRING      COMMENT  '收件人姓名',
    delivery_address   STRING       COMMENT  '收件人地址',
    delivery_phone     STRING      COMMENT  '收件人电话'
) COMMENT '订单基本信息表'
  ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' LINES TERMINATED BY '\n'
  STORED AS TEXTFILE;

现在需要查询收件人地址中出现关键字“大学”的记录,找出对应的 (order_sn, delivery_address),通过 UDF 实现。

通过 Python 实现 Hive 的 UDF,Python 脚本需要以特定的方式读入和输出,除了必须引用 sys 包外,无须引用其他外部包。

这里,查询出现指定关键字记录的 UDF 的 Python 实现过程 FindSpecifiedWords.py 如下:

import sys

for line in sys.stdin:
    order_sn, delivery_address = line.strip().split("\t")

    found = "N"
    pos = delivery_address.decode("utf8").find(u'\u5927\u5b66')
    if(pos > -1):
        found = "Y"
    print("\t".join([order_sn, delivery_address, found]))

这里,Python 实现的 UDF,需要批量的读入数据,并一对一的批量输出。其中,u'\u5927\u5b66' 是“大学”的 utf8 的编码。

使用 Python 实现的 UDF 完成后,需要通过 ADD FILE 指令添加至 Hive 中进行注册,无需起别名:

hive > ADD FILE /mnt/whlminds/FindSpecifiedWords.py

注册完后,Python 实现的 UDF 就可以通过 TRANSFORM...AS 在 HiveQL 中使用,语法如下:

SELECT TRANSFORM ()
USING 'python '
AS ()
FROM ;

其中, SELECT 中的 columns 是 FROM 中 table 的列名, 而 AS 中的 columns 是经过 USING 中 Python 脚本 python_script 计算返回的列名。

这里,查找包含指定关键字的 HiveQL 脚本如下:

SELECT t2.order_sn, t2.delivery_address
FROM
(SELECT TRANSFORM (t1.order_sn, t1.delivery_address)
 USING 'python FindSpecifiedWords.py'
 AS (order_sn STRING, delivery_address STRING, found STRING)
 FROM order_base t1
 WHERE t1.pay_time >= UNIX_TIMESTAMP('2015-10-04 00:00:00')
 AND t1.pay_time < UNIX_TIMESTAMP('2015-10-05 00:00:00')
) t2
WHERE t2.found = 'Y';

为查询方便,可以将 Hive 中注册,以及调用过程一起写在 SQL 脚本 find_specified_order.sql 中:

ADD FILE /home/hduser/scripts/FindSpecifiedWords.py;

SELECT t2.order_sn, t2.delivery_address
FROM
(SELECT TRANSFORM (t1.order_sn, t1.delivery_address)
 USING 'python FindSpecifiedWords.py'
 AS (order_sn STRING, delivery_address STRING, found STRING)
 FROM order_base t1
 WHERE t1.pay_time >= UNIX_TIMESTAMP('2015-10-04 00:00:00')
 AND t1.pay_time < UNIX_TIMESTAMP('2015-10-05 00:00:00')
) t2
WHERE t2.found = 'Y';
然后,执行 SQL 脚本:
hive> hive -f find_specified_order.sql

Java-UDF vs. Python-UDF

这里比较下二者的异同:

  • Java 实现 UDF,需要引用包含 Hive API 的外部 jar 包,而 Python 无需引起其他外部包;
  • Java 实现 UDF 后,需要打包后才可被 HiveQL 调用,而通过 Python 实现 UDF 后,可以在 HiveQL 中直接被调用;
  • Java 实现 UDF,对读入和输出数据方式没有要求,实现的 UDF 可以输入一条记录的指定列数据,输出结果可以直接在 HiveQL 的 WHERE 中用于判断条件使用;Python 实现的 UDF,对读入和输出数据方式有特殊要求,需要对 HiveQL 中表的指定列数据批量读入,然后一对一地批量输出,因此,通过 Python 实现的 UDF 可以结合子查询使用。

《PySpark原理深入与编程实战》