HBase Java API编程示例_2

在本节,我们介绍如下内容:

创建、修改和删除表的操作

下面这个示例,演示了使用HBase Java API创建、修改和删除表的操作。

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;

// 使用Java API,创建、修改和删除表
public class HBaseDemo2 {

	private static final String TABLE_NAME = "MY_TABLE_NAME_TOO";
	private static final String CF_DEFAULT = "DEFAULT_COLUMN_FAMILY";

	public static void createOrOverwrite(Admin admin, HTableDescriptor table) throws IOException {
		if (admin.tableExists(table.getTableName())) {
			admin.disableTable(table.getTableName());
			admin.deleteTable(table.getTableName());
		}
		admin.createTable(table);
	}

	public static void createSchemaTables(Configuration config) throws IOException {
		try (Connection connection = ConnectionFactory.createConnection(config); 
				Admin admin = connection.getAdmin()) {
			HTableDescriptor table = new HTableDescriptor(TableName.valueOf(TABLE_NAME));

			table.addFamily(new HColumnDescriptor(CF_DEFAULT).setCompressionType(Algorithm.NONE));
			System.out.print("Creating table. ");

			createOrOverwrite(admin, table);
			System.out.println(" Done.");
		}
	}

	public static void modifySchema(Configuration config) throws IOException {
		try (Connection connection = ConnectionFactory.createConnection(config); 
				Admin admin = connection.getAdmin()) {

			TableName tableName = TableName.valueOf(TABLE_NAME);
			if (!admin.tableExists(tableName)) {
				System.out.println("Table does not exist.");
				System.exit(-1);
			}

			HTableDescriptor table = admin.getTableDescriptor(tableName);

			// 更新已经存在的表
			HColumnDescriptor newColumn = new HColumnDescriptor("NEWCF");
			newColumn.setCompactionCompressionType(Algorithm.GZ);
			newColumn.setMaxVersions(HConstants.ALL_VERSIONS);
			admin.addColumn(tableName, newColumn);

			// 更新已经存在的列族
			HColumnDescriptor existingColumn = new HColumnDescriptor(CF_DEFAULT);
			existingColumn.setCompactionCompressionType(Algorithm.GZ);
			existingColumn.setMaxVersions(HConstants.ALL_VERSIONS);
			table.modifyFamily(existingColumn);
			admin.modifyTable(tableName, table);

			// 禁用一个已经存在的表
			admin.disableTable(tableName);

			// 删除一个已经存在的列族
			admin.deleteColumn(tableName, CF_DEFAULT.getBytes("UTF-8"));

			// 删除一个表(首先要禁用这个表)
			admin.deleteTable(tableName);
		}
	}

	public static void main(String[] args) throws IOException {
		Configuration config = HBaseConfiguration.create();

		// 添加任何必要的配置文件(hbase-site.xml, core-site.xml)
		config.addResource(new Path(System.getenv("HBASE_CONF_DIR"), "hbase-site.xml"));
		config.addResource(new Path(System.getenv("HADOOP_CONF_DIR"), "core-site.xml"));
		createSchemaTables(config);
		modifySchema(config);
	}

}

使用get获取一条数据

下面的示例代码,演示了如何使用Get对象来获取一条数据。

	// 获取指定单元格的数据
	public static void query(String tableName, String rowKey, String colFamily, String column) 
throws IOException {
		Table table = connection.getTable(TableName.valueOf(tableName));
		Get get = new Get(rowKey.getBytes()); // 设置行键
		get.addFamily(colFamily.getBytes());
		Result result = table.get(get);

		byte[] valueBytes = result.getValue(colFamily.getBytes(), column == null ? null : column.getBytes());
		String value = new String(valueBytes);
		System.out.println("成绩是:" + value);

		table.close();
	}

	// 查找多个单元格中的数据(给定多个行键)
	public static List query(String tableName, String[] rowkeys, String family, String qualifier)
			throws IOException {
		Table table = connection.getTable(TableName.valueOf(tableName));

		List listOfGets = new ArrayList<>();
		for (String rowkey : rowkeys) {
			listOfGets.add(new Get(Bytes.toBytes(rowkey)));
		}

		Result[] records = table.get(listOfGets);

		List values = new ArrayList<>();
		for (Result r : records) {
			byte[] b = r.getValue(Bytes.toBytes(family), Bytes.toBytes(qualifier));
			values.add(new String(b));
		}
		table.close();

		return values;
	}

	// 查找多个单元格中的数据(给定多个行键,多个列限定符)
	public static List query(String tableName, String[] rowkeys, String family, String[] qualifiers)
			throws IOException {
		Table table = connection.getTable(TableName.valueOf(tableName)); // 表

		List listOfGets = new ArrayList<>();
		for (String rowkey : rowkeys) {
			listOfGets.add(new Get(Bytes.toBytes(rowkey)));
		}

		Result[] records = table.get(listOfGets); // 获取多条记录

		// 解析每条记录
		List values = new ArrayList<>();
		StringBuilder builder;
		for (Result r : records) {
			builder = new StringBuilder();
			// 解析每条记录中的每个列限定符
			for (String qualifier : qualifiers) {
				byte[] b = r.getValue(Bytes.toBytes(family), Bytes.toBytes(qualifier));
				builder.append(new String(b));
				builder.append(", ");
			}
			values.add(builder.toString());
		}
		table.close();

		return values;
	}

使用Scan获取多条数据

HBase的查询实现只提供两种方式:

  • 1) 按指定RowKey获取唯一一条记录,get方法(org.apache.hadoop.hbase.client.Get)。
  • 2) 按指定的条件获取一批记录,scan方法(org.apache.Hadoop.Hbase.client.Scan)。

使用scan方式可以实现条件查询功能。scan在使用时有以下几点值得注意:

  • 1) scan可以通过setCaching与setBatch方法提高速度(以空间换时间)。
  • 2) scan可以通过setStartRow与setEndRow来限定范围([start,end)start是闭区间,end是开区间)。范围越小,性能越高。通过巧妙的RowKey设计使我们批量获取记录集合中的元素挨在一起(应该在同一个Region下),可以在遍历结果时获得很好的性能。
  • 3) scan可以通过setFilter方法添加过滤器,这也是分页、多条件查询的基础。

注:可在hbase-site.xml中配置缓存数量:

<property>
	<name>hbase.client.scanner.caching</name>
	<value>5</value>
</property>

请看下面使用Scan扫描的代码实现。

	// 在Scan()中,可以指定起始行和终止行,还可以指定时间范围作为过滤器(获得指定时间范围内的数据)
	public static void scan(String tableName) throws IOException {
		Table table = connection.getTable(TableName.valueOf(tableName)); // 表

		Scan scan = new Scan(); // 无参构造器的意思是扫描整个数据
		scan.setMaxVersions(2); // 指定获取的版本数量为2
		ResultScanner scanner = table.getScanner(scan);
		for (Result result : scanner) {  // Result代表获取的单行对象
			System.out.println("扫描到的行是: " + Bytes.toString(result.getRow()));
		}
		scanner.close();
		table.close();
	}

	// 扫描指定的行键范围
	public static void scan(String tableName, String startingRowKey, String stoppingRowKey, String family,
			String qualifier) throws IOException {
		Table table = connection.getTable(TableName.valueOf(tableName)); // 表

		Scan scan = new Scan(Bytes.toBytes(startingRowKey), Bytes.toBytes(stoppingRowKey));
		scan.addColumn(Bytes.toBytes(family), Bytes.toBytes(qualifier));
		ResultScanner scanner = table.getScanner(scan);
		for (Result result : scanner) {
			byte[] value = result.getValue(Bytes.toBytes(family),Bytes.toBytes(qualifier));
			System.out.println(" " + Bytes.toString(result.getRow()) + " => " + Bytes.toString(value));
		}
		scanner.close();
		table.close();
	}

	// 以批处理的方式扫描
	public static void scanInBatch(String tableName, String family) throws IOException {
		Table table = connection.getTable(TableName.valueOf(tableName)); // 表

		Scan scan = new Scan();
		scan.addFamily(Bytes.toBytes(family));

		scan.setBatch(2);    // 每次rpc请求传输2行
		System.out.println("批处理数量: " + scan.getBatch());
		ResultScanner scanner = table.getScanner(scan);
		for (Result result : scanner) {
			System.out.println("数据: ");
			for (Cell cell : result.listCells()) {
				System.out.println(Bytes.toString(CellUtil.cloneValue(cell)));
			}
		}
		scanner.close();
		table.close();
	}

对数据进行删除和修改

对数据删除,需要用到org.apache.hadoop.hbase.client.Delete对象。

	// 删除整行
	public static void delete(String tableName, String rowkey) throws IOException {
		Table table = connection.getTable(TableName.valueOf(tableName)); // 表
		
		Delete delete = new Delete(Bytes.toBytes(rowkey)); 	// 构造Delete对象
		table.delete(delete);			// 从表中删除
		
		table.close();
	}
	
	// 删除一个列族中的给定列
	public static void delete(String tableName, String rowkey, String family,String qualifier) throws IOException {
		Table table = connection.getTable(TableName.valueOf(tableName)); // 表
		
		Delete delete = new Delete(Bytes.toBytes(rowkey));	// 构造Delete对象
		delete.addColumns(Bytes.toBytes(family),Bytes.toBytes(qualifier));// 删除指定列的所有版本
		table.delete(delete);
		
		table.close();
	}

《PySpark原理深入与编程实战》