如果您覺得“大數據開發運維架構”對你有幫助,歡迎轉發朋友圈
1.概述:
HBase包含幾種將數據加載到表中的方法。最直接的方法是使用MapReduce作業中的TableOutputFormat類,或者使用客戶端api;然而,這些並不總是最有效的方法。
BulkLoad將文本文件或者其他數據庫上傳到HDFS中。將數據轉換為HFile,這個步驟需要MapReduce,將Rowkey作為OutputKey,將一個Put或者Delete作為OutputValue,該階段在輸出文件夾中,一個Region就創建一個HFile;RegionServers使用LoadIncrementalHFiles將HFile文件移動到相應的Region目錄下。這種方法不佔用Region資源、能快速導入海量的數據、還節省了內存,避免了頻繁進行flush,split,compact等大量IO操作,配合mapreduce完成更高效便捷。
注意:
BulkLoad由於並不是通過HBase的API寫入數據,而是直接在HDFS上生成了HFile文件,所以並不會記錄WAL預寫日誌,如果集群是通過Replication機制同步數據的話,則不會同步bulkload部分的數據,因為Replication是通過預寫日誌WAL備份的。
2.實戰:
數據準備:
數據文件student.txt,上傳到/tmp//bulkLoadInput目錄下:
<code>1,lujs1,11,712,lujs2,12,723,lujs3,13,734,lujs4,14,745,lujs5,15,756,lujs6,16,767,lujs7,17,778,lujs8,18,78/<code>
在hbase shell中新建studentbulk表,執行命令:
<code>create 'studentbulk','info'/<code>
自定義一個Mapper類,處理輸入數據生產HFile
<code>package com.unicom.ljs.hbase125.study;import org.apache.hadoop.hbase.client.Put;import org.apache.hadoop.hbase.io.ImmutableBytesWritable;import org.apache.hadoop.hbase.util.Bytes;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;/** * @author: Created By lujisen * @company ChinaUnicom Software JiNan * @date: 2020-02-01 14:58 * @version: v1.0 * @description: com.unicom.ljs.hbase125.study */public class HFileMapper extends Mapper<longwritable> { @Override /*輸入key的類型 輸入value的類型 輸出key的類型 輸出value的類型*/ protected void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException { String[] lineWords = value.toString().split(","); String rowKey = lineWords[0]; ImmutableBytesWritable row =new ImmutableBytesWritable(Bytes.toBytes(rowKey)); Put put =new Put(Bytes.toBytes(rowKey)); put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes(lineWords[1])); put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("age"), Bytes.toBytes(lineWords[2])); put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("score"), Bytes.toBytes(lineWords[3])); context.write(row, put); }}/<longwritable>/<code>
主類代碼實例:
<code>package com.unicom.ljs.hbase125.study;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.hbase.HBaseConfiguration;import org.apache.hadoop.hbase.TableName;import org.apache.hadoop.hbase.client.*;import org.apache.hadoop.hbase.io.ImmutableBytesWritable;import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class BulkLoadData { public static Configuration conf =null; public static Connection conn =null; public static Table table =null; public static RegionLocator locator =null; public static Admin admin =null; public static final String tableName="studentbulk"; public static final String inputPath="/tmp/bulkLoadInput/"; public static final String outputPath="/tmp/bulkLoadOutput/"; public static void main(String[] args) { try { conf = HBaseConfiguration.create(); conf.set("hbase.zookeeper.quorum","salver158.hadoop.unicom,salver31.hadoop.unicom,salver32.hadoop.unicom"); conf.set("hbase.zookeeper.property.clientPort", "2181"); conf.set("zookeeper.znode.parent", "/hbase-unsecure"); conn = ConnectionFactory.createConnection(conf); table =conn.getTable(TableName.valueOf(tableName)); locator =conn.getRegionLocator(TableName.valueOf(tableName)); admin =conn.getAdmin(); Job job = Job.getInstance(); job.setJarByClass(BulkLoadData.class); //map端key value輸出類型 job.setMapperClass(HFileMapper.class); job.setMapOutputKeyClass(ImmutableBytesWritable.class); job.setMapOutputValueClass(Put.class); //文件輸入類型 job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(HFileOutputFormat2.class); //定義輸入輸出文件路徑 FileInputFormat.addInputPath(job, new Path(inputPath)); FileOutputFormat.setOutputPath(job, new Path(outputPath)); //配置bulkLoad HFileOutputFormat2.configureIncrementalLoad(job, table, locator); boolean result = job.waitForCompletion(true); System.out.println("Mapreduce執行結果:"+result); /*加載文件到hbase表*/ LoadIncrementalHFiles load =new LoadIncrementalHFiles(conf); load.doBulkLoad(new Path(outputPath), admin, table, locator); }catch (Exception e) { System.out.println("報錯信息:"+e); }finally { System.out.println("BulkLoadData執行完成!!!"); } }}/<code>
最後將程序打成jar包,將程序相關依賴打進去,打包依賴pom需要添加一個plugin:
<code> <plugins> <plugin> <artifactid> maven-assembly-plugin/<artifactid> <configuration> <descriptorrefs> <descriptorref>jar-with-dependencies/<descriptorref> /<descriptorrefs> <archive> <manifest> <mainclass> com.unicom.ljs.hbase125.study.BulkLoadData/<mainclass> /<manifest> /<archive> /<configuration> <executions> <execution>make-assembly <phase>package/<phase> <goals> <goal>assembly/<goal> /<goals> /<execution> /<executions> /<plugin> /<plugins>/<code>
最後將導出的jar上傳到集群任意一個節點上,執行以下命令提交jar:
<code>hadoop jar hbase125-1.0-SNAPSHOT-jar-with-dependencies.jar com.unicom.ljs.hbase125.study.BulkLoadData/<code>
任務執行完成,查看數據,bulkload導入成功。
閱讀更多 JasonLu1986 的文章