使用JDBC连接Phoenix

Apache Phoenix还提供了Java APIs,用于开发者编写代码。用户可以通过标准JDBC接口以编程方式访问Phoenix(进而间接访问HBase)。开发者可以像使用其它Java库一样在项目中使用这些API。

使用JDBC获得到HBase集群的连接,如下所示:

String myUrl = "jdbc:phoenix:localhost:2181";
Properties props = new Properties();
props.setProperty("CurrentSCN", Long.toString(ts));
Connection conn = DriverManager.connect(myUrl, props);
conn.createStatement().execute("UPSERT INTO myTable VALUES ('a')");
conn.commit();

这就相当于用HBase API做这件事:

myTable.put(Bytes.toBytes('a'),ts);

通过指定CurrentSCN告诉Phoenix,我们希望该连接的所有操作都在那个时间戳中完成。注意,这也适用于在连接上执行的查询——例如,上面的myTable上的查询不会看到它刚刚更新的数据,因为它只会看到在它的CurrentSCN属性之前创建的数据。这提供了一种执行快照、闪回或时间点查询的方法。

下面是一个使用标准JDBC连接并访问Phoenix的示例代码。

import java.sql.Connection;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;

/**
 * 使用Phoenix API实现对HBase数据库的CRUD操作
 */
public class PhoenixJdbc {

	static {
		try {
			Class.forName("org.apache.phoenix.jdbc.PhoenixDriver");
			System.out.println("Phoenix驱动加载成功!");
		} catch (ClassNotFoundException e) {
			System.out.println("无法加载Phoenix驱动!");
			e.printStackTrace();
		}
	}
	
	public static void main(String[] args) {
		// 建表的sql语句
		String createSql = "create table if not exists user_tb(id INTEGER NOT NULL PRIMARY KEY, d.uname VARCHAR)";
		
		try (Connection connection = DriverManager.getConnection("jdbc:phoenix:localhost:2181:/hbase");
			Statement statement = connection.createStatement()) {
			System.out.println("已建立连接...");
			
			// 创建表并插入两条记录
			statement.executeUpdate(createSql);
			statement.executeUpdate("upsert into user_tb values (1,'张三')");
			statement.executeUpdate("upsert into user_tb values (2,'李四')");
			connection.commit();
			
			// 执行查询
			try(PreparedStatement ps = connection.prepareStatement("select * from user_tb");
					ResultSet rs = ps.executeQuery()) {
				System.out.println("查询到的值:");				
				while (rs.next()) {
					Integer id = rs.getInt("id");
					String name = rs.getString("uname");
					System.out.println("\tRow: " + id + " = " + name);
				}
			} catch (SQLException e) {
				System.out.println("查询失败");
				e.printStackTrace();
			} 
		} catch (SQLException e1) {
			System.out.println("无法建立连接...");
			e1.printStackTrace();
		}		
	}

}

《Spark原理深入与编程实战》