HBase Java API编程示例_2
在本节,我们介绍如下内容:
创建、修改和删除表的操作
下面这个示例,演示了使用HBase Java API创建、修改和删除表的操作。
import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.io.compress.Compression.Algorithm; // 使用Java API,创建、修改和删除表 public class HBaseDemo2 { private static final String TABLE_NAME = "MY_TABLE_NAME_TOO"; private static final String CF_DEFAULT = "DEFAULT_COLUMN_FAMILY"; public static void createOrOverwrite(Admin admin, HTableDescriptor table) throws IOException { if (admin.tableExists(table.getTableName())) { admin.disableTable(table.getTableName()); admin.deleteTable(table.getTableName()); } admin.createTable(table); } public static void createSchemaTables(Configuration config) throws IOException { try (Connection connection = ConnectionFactory.createConnection(config); Admin admin = connection.getAdmin()) { HTableDescriptor table = new HTableDescriptor(TableName.valueOf(TABLE_NAME)); table.addFamily(new HColumnDescriptor(CF_DEFAULT).setCompressionType(Algorithm.NONE)); System.out.print("Creating table. "); createOrOverwrite(admin, table); System.out.println(" Done."); } } public static void modifySchema(Configuration config) throws IOException { try (Connection connection = ConnectionFactory.createConnection(config); Admin admin = connection.getAdmin()) { TableName tableName = TableName.valueOf(TABLE_NAME); if (!admin.tableExists(tableName)) { System.out.println("Table does not exist."); System.exit(-1); } HTableDescriptor table = admin.getTableDescriptor(tableName); // 更新已经存在的表 HColumnDescriptor newColumn = new HColumnDescriptor("NEWCF"); newColumn.setCompactionCompressionType(Algorithm.GZ); newColumn.setMaxVersions(HConstants.ALL_VERSIONS); admin.addColumn(tableName, newColumn); // 更新已经存在的列族 HColumnDescriptor existingColumn = new HColumnDescriptor(CF_DEFAULT); existingColumn.setCompactionCompressionType(Algorithm.GZ); existingColumn.setMaxVersions(HConstants.ALL_VERSIONS); table.modifyFamily(existingColumn); admin.modifyTable(tableName, table); // 禁用一个已经存在的表 admin.disableTable(tableName); // 删除一个已经存在的列族 admin.deleteColumn(tableName, CF_DEFAULT.getBytes("UTF-8")); // 删除一个表(首先要禁用这个表) admin.deleteTable(tableName); } } public static void main(String[] args) throws IOException { Configuration config = HBaseConfiguration.create(); // 添加任何必要的配置文件(hbase-site.xml, core-site.xml) config.addResource(new Path(System.getenv("HBASE_CONF_DIR"), "hbase-site.xml")); config.addResource(new Path(System.getenv("HADOOP_CONF_DIR"), "core-site.xml")); createSchemaTables(config); modifySchema(config); } }
使用get获取一条数据
下面的示例代码,演示了如何使用Get对象来获取一条数据。
// 获取指定单元格的数据 public static void query(String tableName, String rowKey, String colFamily, String column) throws IOException { Table table = connection.getTable(TableName.valueOf(tableName)); Get get = new Get(rowKey.getBytes()); // 设置行键 get.addFamily(colFamily.getBytes()); Result result = table.get(get); byte[] valueBytes = result.getValue(colFamily.getBytes(), column == null ? null : column.getBytes()); String value = new String(valueBytes); System.out.println("成绩是:" + value); table.close(); } // 查找多个单元格中的数据(给定多个行键) public static Listquery(String tableName, String[] rowkeys, String family, String qualifier) throws IOException { Table table = connection.getTable(TableName.valueOf(tableName)); List listOfGets = new ArrayList<>(); for (String rowkey : rowkeys) { listOfGets.add(new Get(Bytes.toBytes(rowkey))); } Result[] records = table.get(listOfGets); List values = new ArrayList<>(); for (Result r : records) { byte[] b = r.getValue(Bytes.toBytes(family), Bytes.toBytes(qualifier)); values.add(new String(b)); } table.close(); return values; } // 查找多个单元格中的数据(给定多个行键,多个列限定符) public static List query(String tableName, String[] rowkeys, String family, String[] qualifiers) throws IOException { Table table = connection.getTable(TableName.valueOf(tableName)); // 表 List listOfGets = new ArrayList<>(); for (String rowkey : rowkeys) { listOfGets.add(new Get(Bytes.toBytes(rowkey))); } Result[] records = table.get(listOfGets); // 获取多条记录 // 解析每条记录 List values = new ArrayList<>(); StringBuilder builder; for (Result r : records) { builder = new StringBuilder(); // 解析每条记录中的每个列限定符 for (String qualifier : qualifiers) { byte[] b = r.getValue(Bytes.toBytes(family), Bytes.toBytes(qualifier)); builder.append(new String(b)); builder.append(", "); } values.add(builder.toString()); } table.close(); return values; }
使用Scan获取多条数据
HBase的查询实现只提供两种方式:
- 1) 按指定RowKey获取唯一一条记录,get方法(org.apache.hadoop.hbase.client.Get)。
- 2) 按指定的条件获取一批记录,scan方法(org.apache.Hadoop.Hbase.client.Scan)。
使用scan方式可以实现条件查询功能。scan在使用时有以下几点值得注意:
- 1) scan可以通过setCaching与setBatch方法提高速度(以空间换时间)。
- 2) scan可以通过setStartRow与setEndRow来限定范围([start,end)start是闭区间,end是开区间)。范围越小,性能越高。通过巧妙的RowKey设计使我们批量获取记录集合中的元素挨在一起(应该在同一个Region下),可以在遍历结果时获得很好的性能。
- 3) scan可以通过setFilter方法添加过滤器,这也是分页、多条件查询的基础。
注:可在hbase-site.xml中配置缓存数量:
<property> <name>hbase.client.scanner.caching</name> <value>5</value> </property>
请看下面使用Scan扫描的代码实现。
// 在Scan()中,可以指定起始行和终止行,还可以指定时间范围作为过滤器(获得指定时间范围内的数据) public static void scan(String tableName) throws IOException { Table table = connection.getTable(TableName.valueOf(tableName)); // 表 Scan scan = new Scan(); // 无参构造器的意思是扫描整个数据 scan.setMaxVersions(2); // 指定获取的版本数量为2 ResultScanner scanner = table.getScanner(scan); for (Result result : scanner) { // Result代表获取的单行对象 System.out.println("扫描到的行是: " + Bytes.toString(result.getRow())); } scanner.close(); table.close(); } // 扫描指定的行键范围 public static void scan(String tableName, String startingRowKey, String stoppingRowKey, String family, String qualifier) throws IOException { Table table = connection.getTable(TableName.valueOf(tableName)); // 表 Scan scan = new Scan(Bytes.toBytes(startingRowKey), Bytes.toBytes(stoppingRowKey)); scan.addColumn(Bytes.toBytes(family), Bytes.toBytes(qualifier)); ResultScanner scanner = table.getScanner(scan); for (Result result : scanner) { byte[] value = result.getValue(Bytes.toBytes(family),Bytes.toBytes(qualifier)); System.out.println(" " + Bytes.toString(result.getRow()) + " => " + Bytes.toString(value)); } scanner.close(); table.close(); } // 以批处理的方式扫描 public static void scanInBatch(String tableName, String family) throws IOException { Table table = connection.getTable(TableName.valueOf(tableName)); // 表 Scan scan = new Scan(); scan.addFamily(Bytes.toBytes(family)); scan.setBatch(2); // 每次rpc请求传输2行 System.out.println("批处理数量: " + scan.getBatch()); ResultScanner scanner = table.getScanner(scan); for (Result result : scanner) { System.out.println("数据: "); for (Cell cell : result.listCells()) { System.out.println(Bytes.toString(CellUtil.cloneValue(cell))); } } scanner.close(); table.close(); }
对数据进行删除和修改
对数据删除,需要用到org.apache.hadoop.hbase.client.Delete对象。
// 删除整行 public static void delete(String tableName, String rowkey) throws IOException { Table table = connection.getTable(TableName.valueOf(tableName)); // 表 Delete delete = new Delete(Bytes.toBytes(rowkey)); // 构造Delete对象 table.delete(delete); // 从表中删除 table.close(); } // 删除一个列族中的给定列 public static void delete(String tableName, String rowkey, String family,String qualifier) throws IOException { Table table = connection.getTable(TableName.valueOf(tableName)); // 表 Delete delete = new Delete(Bytes.toBytes(rowkey)); // 构造Delete对象 delete.addColumns(Bytes.toBytes(family),Bytes.toBytes(qualifier));// 删除指定列的所有版本 table.delete(delete); table.close(); }