SHC:使用 Spark SQL 高效地讀寫 HBase

Apache Spark 和 Apache HBase 是兩個使用比較廣泛的大數據組件。很多場景需要使用 Spark 分析/查詢 HBase 中的數據,而目前 Spark 內置是支持很多數據源的,其中就包括了 HBase,但是內置的讀取數據源還是使用了 TableInputFormat 來讀取 HBase 中的數據。這個 TableInputFormat 有一些缺點:

  • 一個 Task 裡面只能啟動一個 Scan 去 HBase 中讀取數據;
  • TableInputFormat 中不支持 BulkGet;
  • 不能享受到 Spark SQL 內置的 catalyst 引擎的優化。

基於這些問題,來自 Hortonworks 的工程師們為我們帶來了全新的 Apache Spark—Apache HBase Connector,下面簡稱 SHC。通過這個類庫,我們可以直接使用 Spark SQL 將 DataFrame 中的數據寫入到 HBase 中;而且我們也可以使用 Spark SQL 去查詢 HBase 中的數據,在查詢 HBase 的時候充分利用了 catalyst 引擎做了許多優化,比如分區修剪(partition pruning),列修剪(column pruning),謂詞下推(predicate pushdown)和數據本地性(data locality)等等。因為有了這些優化,通過 Spark 查詢 HBase 的速度有了很大的提升。

注意:SHC 同時還提供了將 DataFrame 中的數據直接寫入到 HBase 中,但是整個代碼並沒有什麼優化的地方,所以本文對這部分不進行介紹。感興趣的讀者可以直接到這裡查看相關寫數據到 HBase 的代碼。

SHC 是如何實現查詢優化的呢

SHC 主要使用下面的幾種優化,使得 Spark 獲取 HBase 的數據掃描範圍得到減少,提高了數據讀取的效率。

將使用 Rowkey 的查詢轉換成 get 查詢

我們都知道,HBase 中使用 Get 查詢的效率是非常高的,所以如果查詢的過濾條件是針對 RowKey 進行的,那麼我們可以將它轉換成 Get 查詢。為了說明這點,我們使用下面的例子進行說明。假設我們定義好的 HBase catalog 如下:


SHC:使用 Spark SQL 高效地讀寫 HBase



那麼如果有類似下面的查詢


SHC:使用 Spark SQL 高效地讀寫 HBase


因為查詢條件直接是針對 RowKey 進行的,所以這種情況直接可以轉換成 Get 或者 BulkGet 請求的。第一個 SQL 查詢過程類似於下面過程

SHC:使用 Spark SQL 高效地讀寫 HBase

如果想及時瞭解Spark、Hadoop或者Hbase相關的文章,歡迎關注微信公共帳號:iteblog_hadoop

後面兩條 SQL 查詢其實是等效的,在實現上會把 key in (x1, x2, x3..) 轉換成 (key == x1) or (key == x2) or ... 的。整個查詢流程如下:

SHC:使用 Spark SQL 高效地讀寫 HBase


如果想及時瞭解Spark、Hadoop或者Hbase相關的文章,歡迎關注微信公共帳號:iteblog_hadoop

如果我們的查詢裡面有 Rowkey 還有其他列的過濾,比如下面的例子

sqlContext.sql("select id, col6, col8 from iteblog_table where id = 1 and col7 = 'xxx'")

那麼上面的 SQL 翻譯成 HBase 的下面查詢


SHC:使用 Spark SQL 高效地讀寫 HBase


如果有多個 and 條件,都是使用 SingleColumnValueFilter 進行過濾的,這個都好理解。如果我們有下面的查詢

sqlContext.sql("select id, col6, col8 from iteblog_table where id = 1 or col7 = 'xxx'")

那麼在 shc 裡面是怎麼進行的呢?事實上,如果碰到非 RowKey 的過濾,那麼這種查詢是需要掃描 HBase 的全表的。上面的查詢在 shc 裡面就是將 HBase 裡面的所有數據拿到,然後傳輸到 Spark ,再通過 Spark 裡面進行過濾,可見 shc 在這種情況下效率是很低下的。

注意,上面的查詢在 shc 返回的結果是錯誤的。具體原因是在將 id = 1 or col7 = 'xxx' 查詢條件進行合併時,丟棄了所有的查找條件,相當於返回表的所有數據。定位到代碼可以參見下面的


SHC:使用 Spark SQL 高效地讀寫 HBase


同理,類似於下面的查詢在 shc 裡面其實都是全表掃描,並且將所有的數據返回到 Spark 層面上再進行一次過濾。


SHC:使用 Spark SQL 高效地讀寫 HBase


很顯然,這種方式查詢效率並不高,一種可行的方案是將算子下推到 HBase 層面,在 HBase 層面通過 SingleColumnValueFilter 過濾一部分數據,然後再返回到 Spark,這樣可以節省很多數據的傳輸。

組合 RowKey 的查詢優化

shc 還支持組合 RowKey 的方式來建表,具體如下:


SHC:使用 Spark SQL 高效地讀寫 HBase


上面的 col00 和 col01 兩列組合成一個 rowkey,並且 col00 排在前面,col01 排在後面。比如 col00 ='row002',col01 = 2,那麼組合的 rowkey 為 row002\\x00\\x00\\x00\\x02。那麼在組合 Rowkey 的查詢 shc 都有哪些優化呢?現在我們有如下查詢

df.sqlContext.sql("select col00, col01, col1 from iteblog where col00 = 'row000' and col01 = 0").show()

根據上面的信息,RowKey 其實是由 col00 和 col01 組合而成的,那麼上面的查詢其實可以將 col00 和 col01 進行拼接,然後組合成一個 RowKey,然後上面的查詢其實可以轉換成一個 Get 查詢。但是在 shc 裡面,上面的查詢是轉換成一個 scan 和一個 get 查詢的。scan 的 startRow 為 row000,endRow 為 row000\\xff\\xff\\xff\\xff;get 的 rowkey 為 row000\\xff\\xff\\xff\\xff,然後再將所有符合條件的數據返回,最後再在 Spark 層面上做一次過濾,得到最後查詢的結果。因為 shc 裡面組合鍵查詢的代碼還沒完善,所以當前實現應該不是最終的。

在 shc 裡面下面兩條 SQL 查詢下沉到 HBase 的邏輯一致

df.sqlContext.sql("select col00, col01, col1 from iteblog where col00 = 'row000'").show()

df.sqlContext.sql("select col00, col01, col1 from iteblog where col00 = 'row000' and col01 = 0").show()

唯一區別是在 Spark 層面上的過濾。

scan 查詢優化

如果我們的查詢有 < 或 > 等查詢過濾條件,比如下面的查詢條件:

df.sqlContext.sql("select col00, col01, col1 from iteblog where col00 > 'row000' and col00 < 'row005'").show()

這個在 shc 裡面轉換成 HBase 的過濾為一條 get 和 一個 scan,具體為 get 的 Rowkey 為 row0005\\xff\\xff\\xff\\xff;scan 的 startRow 為 row000,endRow 為 row005\\xff\\xff\\xff\\xff,然後將查詢的結果返回到 spark 層面上進行過濾。

總體來說,shc 能在一定程度上對查詢進行優化,避免了全表掃描。但是經過評測,shc 其實還有很多地方不夠完善,算子下沉並沒有下沉到 HBase 層面上進行。目前這個項目正在和 hbase 自帶的 connectors 進行整合(https://github.com/apache/hbase-connectors),相關 issue 參見 Enhance the current spark-hbase connector。

私信回覆小編“學習”即可獲得更多學習資料

SHC:使用 Spark SQL 高效地讀寫 HBase


分享到:


相關文章: