MapReduce程序读写HBase表
可利用MR程序的并行计算能力读写HBase中的数据。既可以将HBase作为数据源,也可以作为Data Sink,或者两者兼具。
下面的示例演示了通过MR处理后的单词计数结果,保存在HBase中的过程。
【示例】HBase和MapReduce。
1、编写mapper
import java.io.IOException; import java.util.StringTokenizer; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class WordCountHBaseMapper extends Mapper
2、编写reducer
import java.io.IOException; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableReducer; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; public class WcHBaseReducer extends TableReducer<Text, IntWritable, ImmutableBytesWritable> { public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { // 遍历求和 sum += val.get(); } Put put = new Put(key.getBytes()); // put实例化,每一个词存一行 // 列族为content,列限定符为count,列值为数目 put.addColumn("content".getBytes(), "count".getBytes(), String.valueOf(sum).getBytes()); context.write(new ImmutableBytesWritable(key.getBytes()), put); // 输出求和后的} }
3、编写driver驱动程序。
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class WordCountDriver extends Configured implements Tool { static final String TABLE_NAME = "wordcount"; // 表名 static final String COLUMN_FAMILY = "content"; // 列族 @Override public int run(String[] args) throws Exception { if (args.length != 1) { System.err.println("语法: WordCountDriver"); System.exit(2); } Configuration conf = HBaseConfiguration.create(); Connection connection = ConnectionFactory.createConnection(conf); // 1、创建表 TableName tableName = TableName.valueOf(TABLE_NAME); Admin admin = connection.getAdmin(); if (admin.tableExists(tableName)) { System.out.println("表已经存在!正在重新创建......"); admin.disableTable(tableName); // 先禁用表 admin.deleteTable(tableName); // 再删除表 } HTableDescriptor htd = new HTableDescriptor(tableName); HColumnDescriptor tcd = new HColumnDescriptor(COLUMN_FAMILY); htd.addFamily(tcd); // 创建列族 admin.createTable(htd); // 创建表 // 2、执行job Job job = Job.getInstance(conf, "WordCountHBase"); job.setJarByClass(WordCountDriver.class); // 使用WordCountHbaseMapper类完成Map过程; job.setMapperClass(WordCountHBaseMapper.class); TableMapReduceUtil.initTableReducerJob(TABLE_NAME, WcHBaseReducer.class, job); // 设置了Map过程和Reduce过程的输出类型,其中设置key的输出类型为Text; job.setOutputKeyClass(Text.class); // 设置了Map过程和Reduce过程的输出类型,其中设置value的输出类型为IntWritable; job.setOutputValueClass(IntWritable.class); // 设置任务数据的输入路径; FileInputFormat.addInputPath(job, new Path(args[0])); // 调用job.waitForCompletion(true) 执行任务 return job.waitForCompletion(true) ? 0 : 1; } public static void main(String[] args) throws Exception { int status = ToolRunner.run(new WordCountDriver(), args); System.exit(status); } }
4、运行测试。请自行提交作业执行,并查看表中的结果。