HBase1.x實戰:BulkLoad批量導入數據代碼實例

如果您覺得“大數據開發運維架構”對你有幫助,歡迎轉發朋友圈


1.概述:

HBase包含幾種將數據加載到表中的方法。最直接的方法是使用MapReduce作業中的TableOutputFormat類,或者使用客戶端api;然而,這些並不總是最有效的方法。

BulkLoad將文本文件或者其他數據庫上傳到HDFS中。將數據轉換為HFile,這個步驟需要MapReduce,將Rowkey作為OutputKey,將一個Put或者Delete作為OutputValue,該階段在輸出文件夾中,一個Region就創建一個HFile;RegionServers使用LoadIncrementalHFiles將HFile文件移動到相應的Region目錄下。這種方法不佔用Region資源、能快速導入海量的數據、還節省了內存,避免了頻繁進行flush,split,compact等大量IO操作,配合mapreduce完成更高效便捷。

注意:

BulkLoad由於並不是通過HBase的API寫入數據,而是直接在HDFS上生成了HFile文件,所以並不會記錄WAL預寫日誌,如果集群是通過Replication機制同步數據的話,則不會同步bulkload部分的數據,因為Replication是通過預寫日誌WAL備份的。


2.實戰:

數據準備:

數據文件student.txt,上傳到/tmp//bulkLoadInput目錄下:

<code>1,lujs1,11,712,lujs2,12,723,lujs3,13,734,lujs4,14,745,lujs5,15,756,lujs6,16,767,lujs7,17,778,lujs8,18,78/<code>

在hbase shell中新建studentbulk表,執行命令:

<code>create 'studentbulk','info'/<code>

自定義一個Mapper類,處理輸入數據生產HFile

<code>package com.unicom.ljs.hbase125.study;​import org.apache.hadoop.hbase.client.Put;import org.apache.hadoop.hbase.io.ImmutableBytesWritable;import org.apache.hadoop.hbase.util.Bytes;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;​import java.io.IOException;​/** * @author: Created By lujisen * @company ChinaUnicom Software JiNan * @date: 2020-02-01 14:58 * @version: v1.0 * @description: com.unicom.ljs.hbase125.study */public class HFileMapper extends Mapper<longwritable> {​    @Override    /*輸入key的類型 輸入value的類型 輸出key的類型 輸出value的類型*/    protected void map(LongWritable key, Text value,        Context context)throws IOException, InterruptedException {​        String[] lineWords = value.toString().split(",");        String rowKey = lineWords[0];        ImmutableBytesWritable row =new ImmutableBytesWritable(Bytes.toBytes(rowKey));​        Put put =new Put(Bytes.toBytes(rowKey));        put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes(lineWords[1]));        put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("age"), Bytes.toBytes(lineWords[2]));        put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("score"), Bytes.toBytes(lineWords[3]));        context.write(row, put);    }}​/<longwritable>/<code>

主類代碼實例:

<code>package com.unicom.ljs.hbase125.study;​import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.hbase.HBaseConfiguration;import org.apache.hadoop.hbase.TableName;import org.apache.hadoop.hbase.client.*;import org.apache.hadoop.hbase.io.ImmutableBytesWritable;import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;​public class BulkLoadData {​    public static Configuration conf =null;    public static Connection conn =null;    public static Table table =null;    public static RegionLocator locator =null;    public static Admin admin =null;    public static final String tableName="studentbulk";    public static final String inputPath="/tmp/bulkLoadInput/";    public static final String outputPath="/tmp/bulkLoadOutput/";​    public static void main(String[] args) {​        try {            conf = HBaseConfiguration.create();            conf.set("hbase.zookeeper.quorum","salver158.hadoop.unicom,salver31.hadoop.unicom,salver32.hadoop.unicom");            conf.set("hbase.zookeeper.property.clientPort", "2181");            conf.set("zookeeper.znode.parent", "/hbase-unsecure");​            conn = ConnectionFactory.createConnection(conf);            table =conn.getTable(TableName.valueOf(tableName));            locator =conn.getRegionLocator(TableName.valueOf(tableName));            admin =conn.getAdmin();​            Job job = Job.getInstance();            job.setJarByClass(BulkLoadData.class);            //map端key value輸出類型            job.setMapperClass(HFileMapper.class);            job.setMapOutputKeyClass(ImmutableBytesWritable.class);            job.setMapOutputValueClass(Put.class);            //文件輸入類型            job.setInputFormatClass(TextInputFormat.class);            job.setOutputFormatClass(HFileOutputFormat2.class);            //定義輸入輸出文件路徑            FileInputFormat.addInputPath(job, new Path(inputPath));            FileOutputFormat.setOutputPath(job, new Path(outputPath));            //配置bulkLoad            HFileOutputFormat2.configureIncrementalLoad(job, table, locator);            boolean result = job.waitForCompletion(true);            System.out.println("Mapreduce執行結果:"+result);​            /*加載文件到hbase表*/            LoadIncrementalHFiles load =new LoadIncrementalHFiles(conf);            load.doBulkLoad(new Path(outputPath), admin, table, locator);​        }catch (Exception e) {            System.out.println("報錯信息:"+e);        }finally {            System.out.println("BulkLoadData執行完成!!!");        }    }}/<code>

最後將程序打成jar包,將程序相關依賴打進去,打包依賴pom需要添加一個plugin:

<code>      <plugins>            <plugin>                <artifactid> maven-assembly-plugin/<artifactid>                <configuration>                    <descriptorrefs>                        <descriptorref>jar-with-dependencies/<descriptorref>                    /<descriptorrefs>                    <archive>                        <manifest>                            <mainclass> com.unicom.ljs.hbase125.study.BulkLoadData/<mainclass>                        /<manifest>                    /<archive>                /<configuration>                <executions>                    <execution>                        make-assembly                        <phase>package/<phase>                        <goals>                            <goal>assembly/<goal>                        /<goals>                    /<execution>                /<executions>            /<plugin>        /<plugins>/<code>

最後將導出的jar上傳到集群任意一個節點上,執行以下命令提交jar:

<code>hadoop jar hbase125-1.0-SNAPSHOT-jar-with-dependencies.jar  com.unicom.ljs.hbase125.study.BulkLoadData/<code>

任務執行完成,查看數據,bulkload導入成功。


分享到:


相關文章: