Phoenix SQL语法与批处理执行

Phoenix将HBase的数据模型映射到关系型世界,支持所有标准SQL查询构造,包括SELECT,FROM,WHERE,GROUP BY,HAVING,ORDER BY等。

它还支持一整套DML命令以及通过DDL命令创建表和版本化增量更改。Phoenix的目标:通过定义明确的行业标准API,成为Hadoop的OLTP和运营分析的可信数据平台。

数据类型

Apache Phoenix SQL支持如下这些数据类型:

  • 整数:integer
  • 小数:float, double, decimal, decimal(10,2)
  • 字符串:char(10), varchar, varchar(255),值必须用单引号,不能用双引号
  • 布尔:boolean
  • 时间:time, date, timestamp
  • 字节数组:binary, varbinary
  • 数组:varchar array, char(10) array[5], integer[] integer[100]

Phoenix SQL语法

DDL支持:CREATE TABLE,DROP TABLE,ALTER TABLE。

DML持:UPSERT VALUES用于逐行插入,UPSERT SELECT用于在相同或不同的表之间传输大量数据,DELETE用于删除行。

Join连接并不完全受支持。不支持FULL OUTER JOIN和CROSS JOIN。

Phoenix SQL在CRUD操作语法解释如下。

create

创建一个简单的user表,id为主键,d为列族(如果不指定列族,内部映射到'0'列族)。可定义DDL属性'DEFAULT_COLUMN_FAMILY=列族名'来覆盖默认的列族名。

create table user(
        id integer not null primary key,
        d.username varchar,
        d.address varchar
);

upsert

注意,phoenix没有insert,其insert与update合起来叫做:upsert。

现在更新/插入两行。这里,显式地为id列设置值。在内部,这个SQL调用被转换为一个HBase put。

upsert into user values(1,'张三','北京市');
upsert into user values(2,'李四','上海市');

select

查询指定的用户

select username from user;

alter

修改表结构。

alter table user add zipcode integer;

delete

删除行。

delete from user where id=1;

下面演示常见的Phoenix SQL语法的使用。

--创建电商表
CREATE TABLE t1(
	pkey  integer NOT NULL PRIMARY KEY,
	p.name varchar,
	p.price double,
	c.name varchar,
	c.address varchar
);

-- 插入数据
-- 1) 商品数据
UPSERT INTO t1(pkey, p.name, p.price) values(1, '衬衣',168.50);
UPSERT INTO t1(pkey, p.name, p.price) values(2, '电脑',5168.50);

UPSERT INTO t1(pkey, c.name, c.address) values(3, '张三','北京市');
UPSERT INTO t1(pkey, c.name, c.address) values(4, '李四','上海市');

UPSERT INTO t1 values(5,'图书', 68.80, '王老五','深圳市');
UPSERT INTO t1(pkey, p.name, p.price) values(3,'手机',3568.00);
UPSERT INTO t1(pkey, p.price) values(3,2538.00);

-- 动态修改表结构
ALTER TABLE t1 ADD c.phone varchar;
UPSERT INTO t1(pkey, c.name, c.address, c.phone) values(5, '赵小六','成都市','13566668888');

-- 删除
DELETE FROM t1 WHERE pkey=2;
DELETE FROM t1 WHERE p.name='手机';
DELETE FROM t1;

-- 查询
SELECT * FROM t1;
SELECT pkey, p.name,c.name FROM t1;

-- 删除表
DROP TABLE t1;

执行批处理

我们也可以创建自己的SQL脚本并使用命令行工具执行它们。可以通过Phoenix的bin目录下的psql.py脚本加载CSV数据或者执行包含sql脚本的文件。

现在我们来看一个例子。导航到Phoenix安装位置的bin/目录。

1)首先,创建一个us_population.sql文件,包含一个表定义:

CREATE TABLE IF NOT EXISTS us_population (
      state CHAR(2) NOT NULL,
      city VARCHAR NOT NULL,
      population BIGINT
      CONSTRAINT my_pk PRIMARY KEY (state, city));

2)然后创建一个us_population.csv文件,其中包含要放入该表的一些数据:

NY,New York,8143197
CA,Los Angeles,3844829
IL,Chicago,2842518
TX,Houston,2016582
PA,Philadelphia,1463281
AZ,Phoenix,1461575
TX,San Antonio,1256509
CA,San Diego,1255540
TX,Dallas,1213825
CA,San Jose,912332

3)最后,创建一个us_population_queries.sql文件,包含希望在该数据上运行的查询。

SELECT state as "State",count(city) as "City Count",sum(population) as "Population Sum"
FROM us_population
GROUP BY state
ORDER BY sum(population) DESC;

4)在命令行终端,执行如下的命令:

$ ./psql.py localhost:2181 us_population.sql us_population.csv us_population_queries.sql

这条命令同时做了三件事:创建表、插入数据、查询结果。典型的upsert速率是每秒20K - 50K行(取决于行有多宽)。


《Spark原理深入与编程实战》