使用Python编写Hive UDF
Hive有一组丰富的函数可以用来执行分析。但是,有时可能会出现仅使用内置函数无法满足需求的情况。在这种情况下,用户需要实现自定义用户定义函数(UDF)并将其提供给Hive查询。
由于Hadoop框架是用Java编写的,大多数Hadoop开发人员自然更喜欢用Java编写UDF。然而,Apache也使非Java开发人员能够轻松地使用Hadoop,这是通过使用Hadoop Streaming接口完成的!
【示例】使用Python编写和使用UDF
1. 首先编写Python数据处理脚本:
nano employees.py
编辑代码如下:
import sys for line in sys.stdin: line = line.strip() (emp_id,emp_name) = line.split('\t') print(emp_id + '\t' + emp_name + ',亲')
2. 创建Hive表:
-- 创建hive表 create table employees ( emp_id int, emp_name string ) row format delimited fields terminated by '\t';
3. 加载数据文件到Hive表:
hive> load data local inpath '/home/hduser/data/hive/peoples.txt' overwrite into table employees;
4. 将自定义Python数据处理文件添加到路径中:
hive> add file /home/hduser/employees.py;
5. 在查询中应用transform函数
hive> select transform(emp_id, emp_name) using 'python employees.py' as (employee_id, emp_name) from employees;
输出结果如下所示:
1 张三,亲 2 李四,亲 3 王老五,亲
【示例】使用Python UDF处理订单
假设有一张 Hive 表,描述订单基本信息:
CREATE TABLE IF NOT EXISTS order_base( id INT COMMENT '自增ID', order_sn STRING COMMENT '订单编号', user_id INT COMMENT '用户ID', shop_id INT COMMENT '店铺ID', add_time INT COMMENT '下单时间', pay_time INT COMMENT '付款时间', delivery_name STRING COMMENT '收件人姓名', delivery_address STRING COMMENT '收件人地址', delivery_phone STRING COMMENT '收件人电话' ) COMMENT '订单基本信息表' ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' LINES TERMINATED BY '\n' STORED AS TEXTFILE;
现在需要查询收件人地址中出现关键字“大学”的记录,找出对应的 (order_sn, delivery_address),通过 UDF 实现。
通过 Python 实现 Hive 的 UDF,Python 脚本需要以特定的方式读入和输出,除了必须引用 sys 包外,无须引用其他外部包。
这里,查询出现指定关键字记录的 UDF 的 Python 实现过程 FindSpecifiedWords.py 如下:
import sys for line in sys.stdin: order_sn, delivery_address = line.strip().split("\t") found = "N" pos = delivery_address.decode("utf8").find(u'\u5927\u5b66') if(pos > -1): found = "Y" print("\t".join([order_sn, delivery_address, found]))
这里,Python 实现的 UDF,需要批量的读入数据,并一对一的批量输出。其中,u'\u5927\u5b66' 是“大学”的 utf8 的编码。
使用 Python 实现的 UDF 完成后,需要通过 ADD FILE 指令添加至 Hive 中进行注册,无需起别名:
hive > ADD FILE /mnt/whlminds/FindSpecifiedWords.py
注册完后,Python 实现的 UDF 就可以通过 TRANSFORM...AS 在 HiveQL 中使用,语法如下:
SELECT TRANSFORM () USING 'python ' AS ( ) FROM ;
其中, SELECT 中的 columns 是 FROM 中 table 的列名, 而 AS 中的 columns 是经过 USING 中 Python 脚本 python_script 计算返回的列名。
这里,查找包含指定关键字的 HiveQL 脚本如下:
SELECT t2.order_sn, t2.delivery_address FROM (SELECT TRANSFORM (t1.order_sn, t1.delivery_address) USING 'python FindSpecifiedWords.py' AS (order_sn STRING, delivery_address STRING, found STRING) FROM order_base t1 WHERE t1.pay_time >= UNIX_TIMESTAMP('2015-10-04 00:00:00') AND t1.pay_time < UNIX_TIMESTAMP('2015-10-05 00:00:00') ) t2 WHERE t2.found = 'Y';
为查询方便,可以将 Hive 中注册,以及调用过程一起写在 SQL 脚本 find_specified_order.sql 中:
ADD FILE /home/hduser/scripts/FindSpecifiedWords.py; SELECT t2.order_sn, t2.delivery_address FROM (SELECT TRANSFORM (t1.order_sn, t1.delivery_address) USING 'python FindSpecifiedWords.py' AS (order_sn STRING, delivery_address STRING, found STRING) FROM order_base t1 WHERE t1.pay_time >= UNIX_TIMESTAMP('2015-10-04 00:00:00') AND t1.pay_time < UNIX_TIMESTAMP('2015-10-05 00:00:00') ) t2 WHERE t2.found = 'Y';然后,执行 SQL 脚本:
hive> hive -f find_specified_order.sql
Java-UDF vs. Python-UDF
这里比较下二者的异同:
- Java 实现 UDF,需要引用包含 Hive API 的外部 jar 包,而 Python 无需引起其他外部包;
- Java 实现 UDF 后,需要打包后才可被 HiveQL 调用,而通过 Python 实现 UDF 后,可以在 HiveQL 中直接被调用;
- Java 实现 UDF,对读入和输出数据方式没有要求,实现的 UDF 可以输入一条记录的指定列数据,输出结果可以直接在 HiveQL 的 WHERE 中用于判断条件使用;Python 实现的 UDF,对读入和输出数据方式有特殊要求,需要对 HiveQL 中表的指定列数据批量读入,然后一对一地批量输出,因此,通过 Python 实现的 UDF 可以结合子查询使用。