本文將介紹HBase的客戶端連接實現,並說明如何正確管理HBase的連接。
最近在搭建一個HBase的可視化管理平臺,搭建完成後發現不管什麼查詢都很慢,甚至於使用api去listTable都要好幾秒。
經過一番排查發現,是每次請求的時候,都去臨時創建了一個connection,而創建connection非常耗時導致整體的rt上升。
因此,就深入瞭解了下如何正確管理HBase的connection,同時,也在優化過程中有些小細節的總結。
本文基於hbase 2.0.0版本的源碼,github上3.0版本的源碼已經有很大差異了,但是思想還是差不多的
1.HBase-client和HBase是如何連接的?
這個問題實際上在我之前的文章 深入HBase讀寫 中介紹過。
當HBase-client第一次請求讀寫的時候,需要三步走:
1)HBase-client從zk中獲取保存meta table的位置信息,知道meta table保存在了哪個region server,然後緩存這個位置信息;
2)HBase-client會查詢這個保存meta table的特定的region server,查詢meta table信息,在table中獲取自己想要訪問的row key所在的region在哪個region server上。
3)客戶端直接訪問目標region server,獲取對應的row
所以,我們知道hbase-client實際上包含三部分連接:
- 跟zk連接,獲取相關元信息
- 跟HMaster連接,做相關DDL操作
- 直接跟各個region server進行連接,進行增刪改查
2.HBase客戶端連接原理
常規寫法是這樣的
<code>Connection connection = ConnectionFactory.createConnection(conf); try { Table table = connection.getTable(TableName.valueOf("tablename”)); // 插入數據 Put put = new Put(Bytes.toBytes("row")); put.addColumn(Bytes.toBytes("family"), Bytes.toBytes("qualifier"), Bytes.toBytes("value")); table.put(put); // 單行讀取 Get get = new Get(Bytes.toBytes("row")); Result res = table.get(get); // 刪除一行數據 Delete delete = new Delete(Bytes.toBytes("row")); table.delete(delete); }catch (IOException e) { //..... } finally { table.close(); connection.close(); }/<code>
我們不禁有這樣的疑問:
1)HBase沒有連接池嗎?
2)connection表示的是一個連接嗎?
3)connection每個線程都得創建嗎?線程安全嗎?
4)table每個線程都得創建嗎?線程安全嗎?
下面一一解答。
首先,Connection是線程安全的,而Table和Admin則不是線程安全的。
因此正確的做法是一個進程(或服務)使用一個Connection對象,而在不同的線程中使用單獨的Table和Admin對象。
Connection持有RpcClient,RpcClient管理了一個連接池poolMap
<code>protected final PoolMap connections; //…. this.connections = new PoolMap<>(getPoolType(conf), getPoolSize(conf));/<code>
通過AbstractRpcClient的getConnection看到,連接T繼承RpcConnection,叫做NettyRpcConnection。
這裡順便通過getPoolType和getPoolSize看了下線程池的大小和類型。
在枚舉類PoolType中有三種線程池類型Reusable, ThreadLocal, RoundRobin,用戶可以用hbase.client.ipc.pool.type指定線程池類型,通過hbase.client.ipc.pool.size指定線程池大小(默認是1)。
3.優化實踐
搞清楚上面的原理後,下面就可以開始優化我們的HBase管理平臺了。
只需要對每個HBase集群的connection使用Map保存下來,每次請求的時候拿出對應的connection進去相關操作即可。然後需要注意在系統退出的時候關閉所有的connection。
上代碼:
<code>public class ConnectionManager { private Map connectionMap = new ConcurrentHashMap<>(); public Connection getConnection(String resourceId, Configuration configuration) { ResourceInfo resourceInfo = ResourceInfoCache.getResourceInfoByCache(resourceId); if (resourceInfo == null) { throw new IllegalArgumentException("error resourceid: " + resourceId); } String key = getClusterKey(resourceInfo); if (connectionMap.containsKey(key)) { return connectionMap.get(key); } synchronized (this) { if (connectionMap.containsKey(key)) { return connectionMap.get(key); } Connection connection = null; try { connection = ConnectionFactory.createConnection(configuration); } catch (IOException e) { return null; } connectionMap.put(key, connection); return connection; } } @PreDestroy public void doDestroy() { for (Map.Entry entry : connectionMap.entrySet()) { Connection connection = entry.getValue(); if (connection != null) { try { connection.close(); } catch (IOException e) { //。。。。 } } } } }/<code>
這裡有幾個注意點:
- 將ConnectionManager註冊為bean,交給spring容器管理生命週期,同時保證單例。
- 使用@PreDestroy保證應用關閉時,能正確釋放所有連接,避免連接洩漏
- connectionMap使用ConcurrentHashMap保證線程安全
- dcl檢查,避免重複創建同一個connection,浪費資源;並且避免重複創建connection後,無法關閉導致連接洩漏。
在需要查詢時,只需要通過getConnection獲取已經存在的connection即可。