MapReduce程序读写HBase表
可利用MR程序的并行计算能力读写HBase中的数据。既可以将HBase作为数据源,也可以作为Data Sink,或者两者兼具。
下面的示例演示了通过MR处理后的单词计数结果,保存在HBase中的过程。
【示例】HBase和MapReduce。
1、编写mapper
import java.io.IOException; import java.util.StringTokenizer; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class WordCountHBaseMapper extends Mapper
2、编写reducer
import java.io.IOException;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
public class WcHBaseReducer extends TableReducer<Text, IntWritable, ImmutableBytesWritable> {
public void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) { // 遍历求和
sum += val.get();
}
Put put = new Put(key.getBytes()); // put实例化,每一个词存一行
// 列族为content,列限定符为count,列值为数目
put.addColumn("content".getBytes(), "count".getBytes(), String.valueOf(sum).getBytes());
context.write(new ImmutableBytesWritable(key.getBytes()), put); // 输出求和后的
}
}
3、编写driver驱动程序。
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class WordCountDriver extends Configured implements Tool {
static final String TABLE_NAME = "wordcount"; // 表名
static final String COLUMN_FAMILY = "content"; // 列族
@Override
public int run(String[] args) throws Exception {
if (args.length != 1) {
System.err.println("语法: WordCountDriver ");
System.exit(2);
}
Configuration conf = HBaseConfiguration.create();
Connection connection = ConnectionFactory.createConnection(conf);
// 1、创建表
TableName tableName = TableName.valueOf(TABLE_NAME);
Admin admin = connection.getAdmin();
if (admin.tableExists(tableName)) {
System.out.println("表已经存在!正在重新创建......");
admin.disableTable(tableName); // 先禁用表
admin.deleteTable(tableName); // 再删除表
}
HTableDescriptor htd = new HTableDescriptor(tableName);
HColumnDescriptor tcd = new HColumnDescriptor(COLUMN_FAMILY);
htd.addFamily(tcd); // 创建列族
admin.createTable(htd); // 创建表
// 2、执行job
Job job = Job.getInstance(conf, "WordCountHBase");
job.setJarByClass(WordCountDriver.class);
// 使用WordCountHbaseMapper类完成Map过程;
job.setMapperClass(WordCountHBaseMapper.class);
TableMapReduceUtil.initTableReducerJob(TABLE_NAME, WcHBaseReducer.class, job);
// 设置了Map过程和Reduce过程的输出类型,其中设置key的输出类型为Text;
job.setOutputKeyClass(Text.class);
// 设置了Map过程和Reduce过程的输出类型,其中设置value的输出类型为IntWritable;
job.setOutputValueClass(IntWritable.class);
// 设置任务数据的输入路径;
FileInputFormat.addInputPath(job, new Path(args[0]));
// 调用job.waitForCompletion(true) 执行任务
return job.waitForCompletion(true) ? 0 : 1;
}
public static void main(String[] args) throws Exception {
int status = ToolRunner.run(new WordCountDriver(), args);
System.exit(status);
}
}
4、运行测试。请自行提交作业执行,并查看表中的结果。