MapReduce程序读写HBase表

可利用MR程序的并行计算能力读写HBase中的数据。既可以将HBase作为数据源,也可以作为Data Sink,或者两者兼具。

下面的示例演示了通过MR处理后的单词计数结果,保存在HBase中的过程。

【示例】HBase和MapReduce。

1、编写mapper

import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class WordCountHBaseMapper extends Mapper {
	private final static IntWritable one = new IntWritable(1);
	private Text word = new Text();

	public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
		StringTokenizer itr = new StringTokenizer(value.toString());
		while (itr.hasMoreTokens()) {
			word.set(itr.nextToken());
			context.write(word, one);		// 输出<key,value>为<word,one>
		}
	}
}

2、编写reducer

import java.io.IOException;

import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;

public class WcHBaseReducer extends TableReducer<Text, IntWritable, ImmutableBytesWritable> {

	public void reduce(Text key, Iterable<IntWritable> values, Context context)  
								throws IOException, InterruptedException {
		int sum = 0;
		for (IntWritable val : values) {			// 遍历求和
			sum += val.get();
		}
		
		Put put = new Put(key.getBytes());    	// put实例化,每一个词存一行
		// 列族为content,列限定符为count,列值为数目
		put.addColumn("content".getBytes(), "count".getBytes(), String.valueOf(sum).getBytes());
		
		context.write(new ImmutableBytesWritable(key.getBytes()), put);		// 输出求和后的
	}
}

3、编写driver驱动程序。

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class WordCountDriver extends Configured implements Tool {

	static final String TABLE_NAME = "wordcount";		// 表名
	static final String COLUMN_FAMILY = "content";	// 列族

	@Override
	public int run(String[] args) throws Exception {
		if (args.length != 1) {
			System.err.println("语法: WordCountDriver ");
			System.exit(2);
		}

		Configuration conf = HBaseConfiguration.create();
		Connection connection = ConnectionFactory.createConnection(conf);

		// 1、创建表
		TableName tableName = TableName.valueOf(TABLE_NAME);
		Admin admin = connection.getAdmin();
		if (admin.tableExists(tableName)) {
			System.out.println("表已经存在!正在重新创建......");
			admin.disableTable(tableName); // 先禁用表
			admin.deleteTable(tableName); // 再删除表
		}

		HTableDescriptor htd = new HTableDescriptor(tableName);
		HColumnDescriptor tcd = new HColumnDescriptor(COLUMN_FAMILY);
		htd.addFamily(tcd); 		// 创建列族
		admin.createTable(htd); 	// 创建表

		// 2、执行job
		Job job = Job.getInstance(conf, "WordCountHBase");
		job.setJarByClass(WordCountDriver.class);

		// 使用WordCountHbaseMapper类完成Map过程;
		job.setMapperClass(WordCountHBaseMapper.class);
		TableMapReduceUtil.initTableReducerJob(TABLE_NAME, WcHBaseReducer.class, job);

		// 设置了Map过程和Reduce过程的输出类型,其中设置key的输出类型为Text;
		job.setOutputKeyClass(Text.class);
		// 设置了Map过程和Reduce过程的输出类型,其中设置value的输出类型为IntWritable;
		job.setOutputValueClass(IntWritable.class);

		// 设置任务数据的输入路径;
		FileInputFormat.addInputPath(job, new Path(args[0]));

		// 调用job.waitForCompletion(true) 执行任务
		return job.waitForCompletion(true) ? 0 : 1;
	}

	public static void main(String[] args) throws Exception {
		int status = ToolRunner.run(new WordCountDriver(), args);
		System.exit(status);
	}

}

4、运行测试。请自行提交作业执行,并查看表中的结果。


《PySpark原理深入与编程实战》