HBase Java API编程示例_2
在本节,我们介绍如下内容:
创建、修改和删除表的操作
下面这个示例,演示了使用HBase Java API创建、修改和删除表的操作。
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
// 使用Java API,创建、修改和删除表
public class HBaseDemo2 {
private static final String TABLE_NAME = "MY_TABLE_NAME_TOO";
private static final String CF_DEFAULT = "DEFAULT_COLUMN_FAMILY";
public static void createOrOverwrite(Admin admin, HTableDescriptor table) throws IOException {
if (admin.tableExists(table.getTableName())) {
admin.disableTable(table.getTableName());
admin.deleteTable(table.getTableName());
}
admin.createTable(table);
}
public static void createSchemaTables(Configuration config) throws IOException {
try (Connection connection = ConnectionFactory.createConnection(config);
Admin admin = connection.getAdmin()) {
HTableDescriptor table = new HTableDescriptor(TableName.valueOf(TABLE_NAME));
table.addFamily(new HColumnDescriptor(CF_DEFAULT).setCompressionType(Algorithm.NONE));
System.out.print("Creating table. ");
createOrOverwrite(admin, table);
System.out.println(" Done.");
}
}
public static void modifySchema(Configuration config) throws IOException {
try (Connection connection = ConnectionFactory.createConnection(config);
Admin admin = connection.getAdmin()) {
TableName tableName = TableName.valueOf(TABLE_NAME);
if (!admin.tableExists(tableName)) {
System.out.println("Table does not exist.");
System.exit(-1);
}
HTableDescriptor table = admin.getTableDescriptor(tableName);
// 更新已经存在的表
HColumnDescriptor newColumn = new HColumnDescriptor("NEWCF");
newColumn.setCompactionCompressionType(Algorithm.GZ);
newColumn.setMaxVersions(HConstants.ALL_VERSIONS);
admin.addColumn(tableName, newColumn);
// 更新已经存在的列族
HColumnDescriptor existingColumn = new HColumnDescriptor(CF_DEFAULT);
existingColumn.setCompactionCompressionType(Algorithm.GZ);
existingColumn.setMaxVersions(HConstants.ALL_VERSIONS);
table.modifyFamily(existingColumn);
admin.modifyTable(tableName, table);
// 禁用一个已经存在的表
admin.disableTable(tableName);
// 删除一个已经存在的列族
admin.deleteColumn(tableName, CF_DEFAULT.getBytes("UTF-8"));
// 删除一个表(首先要禁用这个表)
admin.deleteTable(tableName);
}
}
public static void main(String[] args) throws IOException {
Configuration config = HBaseConfiguration.create();
// 添加任何必要的配置文件(hbase-site.xml, core-site.xml)
config.addResource(new Path(System.getenv("HBASE_CONF_DIR"), "hbase-site.xml"));
config.addResource(new Path(System.getenv("HADOOP_CONF_DIR"), "core-site.xml"));
createSchemaTables(config);
modifySchema(config);
}
}
使用get获取一条数据
下面的示例代码,演示了如何使用Get对象来获取一条数据。
// 获取指定单元格的数据
public static void query(String tableName, String rowKey, String colFamily, String column)
throws IOException {
Table table = connection.getTable(TableName.valueOf(tableName));
Get get = new Get(rowKey.getBytes()); // 设置行键
get.addFamily(colFamily.getBytes());
Result result = table.get(get);
byte[] valueBytes = result.getValue(colFamily.getBytes(), column == null ? null : column.getBytes());
String value = new String(valueBytes);
System.out.println("成绩是:" + value);
table.close();
}
// 查找多个单元格中的数据(给定多个行键)
public static List query(String tableName, String[] rowkeys, String family, String qualifier)
throws IOException {
Table table = connection.getTable(TableName.valueOf(tableName));
List listOfGets = new ArrayList<>();
for (String rowkey : rowkeys) {
listOfGets.add(new Get(Bytes.toBytes(rowkey)));
}
Result[] records = table.get(listOfGets);
List values = new ArrayList<>();
for (Result r : records) {
byte[] b = r.getValue(Bytes.toBytes(family), Bytes.toBytes(qualifier));
values.add(new String(b));
}
table.close();
return values;
}
// 查找多个单元格中的数据(给定多个行键,多个列限定符)
public static List query(String tableName, String[] rowkeys, String family, String[] qualifiers)
throws IOException {
Table table = connection.getTable(TableName.valueOf(tableName)); // 表
List listOfGets = new ArrayList<>();
for (String rowkey : rowkeys) {
listOfGets.add(new Get(Bytes.toBytes(rowkey)));
}
Result[] records = table.get(listOfGets); // 获取多条记录
// 解析每条记录
List values = new ArrayList<>();
StringBuilder builder;
for (Result r : records) {
builder = new StringBuilder();
// 解析每条记录中的每个列限定符
for (String qualifier : qualifiers) {
byte[] b = r.getValue(Bytes.toBytes(family), Bytes.toBytes(qualifier));
builder.append(new String(b));
builder.append(", ");
}
values.add(builder.toString());
}
table.close();
return values;
}
使用Scan获取多条数据
HBase的查询实现只提供两种方式:
- 1) 按指定RowKey获取唯一一条记录,get方法(org.apache.hadoop.hbase.client.Get)。
- 2) 按指定的条件获取一批记录,scan方法(org.apache.Hadoop.Hbase.client.Scan)。
使用scan方式可以实现条件查询功能。scan在使用时有以下几点值得注意:
- 1) scan可以通过setCaching与setBatch方法提高速度(以空间换时间)。
- 2) scan可以通过setStartRow与setEndRow来限定范围([start,end)start是闭区间,end是开区间)。范围越小,性能越高。通过巧妙的RowKey设计使我们批量获取记录集合中的元素挨在一起(应该在同一个Region下),可以在遍历结果时获得很好的性能。
- 3) scan可以通过setFilter方法添加过滤器,这也是分页、多条件查询的基础。
注:可在hbase-site.xml中配置缓存数量:
<property> <name>hbase.client.scanner.caching</name> <value>5</value> </property>
请看下面使用Scan扫描的代码实现。
// 在Scan()中,可以指定起始行和终止行,还可以指定时间范围作为过滤器(获得指定时间范围内的数据)
public static void scan(String tableName) throws IOException {
Table table = connection.getTable(TableName.valueOf(tableName)); // 表
Scan scan = new Scan(); // 无参构造器的意思是扫描整个数据
scan.setMaxVersions(2); // 指定获取的版本数量为2
ResultScanner scanner = table.getScanner(scan);
for (Result result : scanner) { // Result代表获取的单行对象
System.out.println("扫描到的行是: " + Bytes.toString(result.getRow()));
}
scanner.close();
table.close();
}
// 扫描指定的行键范围
public static void scan(String tableName, String startingRowKey, String stoppingRowKey, String family,
String qualifier) throws IOException {
Table table = connection.getTable(TableName.valueOf(tableName)); // 表
Scan scan = new Scan(Bytes.toBytes(startingRowKey), Bytes.toBytes(stoppingRowKey));
scan.addColumn(Bytes.toBytes(family), Bytes.toBytes(qualifier));
ResultScanner scanner = table.getScanner(scan);
for (Result result : scanner) {
byte[] value = result.getValue(Bytes.toBytes(family),Bytes.toBytes(qualifier));
System.out.println(" " + Bytes.toString(result.getRow()) + " => " + Bytes.toString(value));
}
scanner.close();
table.close();
}
// 以批处理的方式扫描
public static void scanInBatch(String tableName, String family) throws IOException {
Table table = connection.getTable(TableName.valueOf(tableName)); // 表
Scan scan = new Scan();
scan.addFamily(Bytes.toBytes(family));
scan.setBatch(2); // 每次rpc请求传输2行
System.out.println("批处理数量: " + scan.getBatch());
ResultScanner scanner = table.getScanner(scan);
for (Result result : scanner) {
System.out.println("数据: ");
for (Cell cell : result.listCells()) {
System.out.println(Bytes.toString(CellUtil.cloneValue(cell)));
}
}
scanner.close();
table.close();
}
对数据进行删除和修改
对数据删除,需要用到org.apache.hadoop.hbase.client.Delete对象。
// 删除整行
public static void delete(String tableName, String rowkey) throws IOException {
Table table = connection.getTable(TableName.valueOf(tableName)); // 表
Delete delete = new Delete(Bytes.toBytes(rowkey)); // 构造Delete对象
table.delete(delete); // 从表中删除
table.close();
}
// 删除一个列族中的给定列
public static void delete(String tableName, String rowkey, String family,String qualifier) throws IOException {
Table table = connection.getTable(TableName.valueOf(tableName)); // 表
Delete delete = new Delete(Bytes.toBytes(rowkey)); // 构造Delete对象
delete.addColumns(Bytes.toBytes(family),Bytes.toBytes(qualifier));// 删除指定列的所有版本
table.delete(delete);
table.close();
}