Delta Lake的使用

Delta Lake 的使用初步:

一:數據庫是Oracle


二:將Oracle的數據導入到hdfs上,並且使用 Delta 的格式進行保存,需要支持合併、刪除等。

步驟上:

  1. 首先需要一個spark的jdbc程序,連接數據庫進行數據導入操作。spark.read.jdbc ,這個不需要贅述,這裡的竅門是 加入 hint 的操作。hint在數據庫中可以告訴 數據庫 走 用戶指定的索引。而在spark的jdbc的API上,看上去並沒有什麼地方可以加hint。
    之前做過嘗試,要不在 column中加入hint。因為spark去讀取數據庫的數據,肯定是執行了數據庫的sql的。
Delta Lake的使用

JDBCRDD

這裡可以看到,是生成了 sqlText 這句sql進行去數據庫執行,hint肯定是加在 ${columnList} 之前的,那要是我第一個column中加入hint是不是就可以了。
結果是差強人意,因為這樣sql執行沒有問題,但是spark的dataset中,column的校驗是同步過的,怎麼可能會有column的名稱叫 /*index*/開頭的字段。
後來看到可以對 options.tableQuery 做處理。


Delta Lake的使用

這裡的tableOrQuery 就是可以處理成一句查詢的sql。將表名改成一句查詢的sql,而在sql中加入hint,這樣就可以做到了。

<code>println(s"sql===> (select ${baseColumns} from ${lTable} where ${sql._1})t")
spark.read.jdbc(prop.getProperty("url"),
s"(select ${baseColumns} from ${lTable} where ${sql._1})t", prop)
.withColumn("pt", expr(s"date_format(${partitionKey},'yyyyMMdd')"))
.write.mode(SaveMode.Overwrite).partitionBy("pt").parquet(commonWritePath + "/" + lTable + "/z_index=" + sql._2)/<code>

這裡,,我們可以在 ${baseColumns}中加入hint,例如

baseColumns=/*+index(GE_BALANCE_DETAIL IDX_BALANCE_DETAIL_BALANCE_DAT) */id,created_time,bill_code,balance_type_id,site_id,balance_confirm_money,settlement_center_id,other_site_id,balance_date

這樣去數據庫查詢的時候,就可以返回我們想要的結果,hint也使用起來了,而且返回的 column 也不會有問題。
這樣結果返回後可以根據分區字段往hdfs寫數據了。此外因為spark的機制原因,執行action動作的時候,其他的任務無法運行,這樣比較浪費。因此可以通過

<code>val jobExecutor = Executors.newFixedThreadPool(inputParams.getOrElse("queryCnt", "2").toInt)
val latch = new CountDownLatch(sqls.length)/<code>

來開啟多線程,這樣可以同時跑多個action任務了,不過這裡需要解決 臨時文件被刪除,導致出現

<code>WARN TaskSetManager: Lost task: org.apache.spark.SparkException: Task failed while writing rows.* WARN TaskSetManager: Lost task:org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException):* No lease on /<code>

這樣的問題,可以通過寫不同的目錄來避免。

2.需要合併的數據寫入到hdfs之後,需要下一步,數據合併來。

首先是判斷合併的模式,模式如果是override,就說明是全覆蓋模式,這種最簡單來,直接使用

<code>val data = spark.read.jdbc(prop.getProperty("url"), lTable, prop)
(columns match {
case Array("*") => data
case other => data.selectExpr(columns: _*)
}).write.format("delta").mode("overwrite").option("mergeSchema", "true").option("overwriteSchema", "true")
.save(deltaTablePath)/<code>

就可以將結果以Delta的形式保存到hdfs上。

如果模式是 merge,這種就複雜點來。

需要進行delta的merge操作。

<code>updateDS.select("pt").distinct().collect().map(_.getAs[Int]("pt")).sortWith(_ < _)
.grouped(inputParams.getOrElse("groupCnt", "30").toInt)
.foreach(rows => {

println(s"pts = ${rows.mkString(",")}")

DeltaTable.forPath(spark, deltaTablePath).as("events")
.merge(
updateDS.filter(s"pt in (${rows.mkString(",")})")
.as("updates"),
s"events.pt in (${rows.mkString(",")}) and events.pt=updates.pt and events.${uniKey} = updates.${uniKey}")
.whenMatched.updateExpr(
updateColumns.filter(r => r.equalsIgnoreCase(uniKey) == false)
.map(r => {
r -> ("updates." + r)
}).toMap)
.whenNotMatched.insertExpr(
updateColumns
.map(r => {
r -> ("updates." + r)
}).toMap)
.execute()
})/<code>

這裡的 pt是分區,首先獲取 導入的數據含有的分區有哪些,然後以默認30個分區作為一個批次進行合併。合併的條件就是 分區相同,且 唯一性主鍵相同,進行合併。如果匹配到了,執行updated操作,記錄需要合併的字段。如果沒有匹配到就可以執行insert操作,這個有點像 oracle 的 merge into 操作。

3.數據都合併完之後,就是對於歷史數據的清算了,Delta Lake的delete操作是不會刪除hdfs上的文件的,而是添加墓碑標記,證明這些數據已經被刪除了,那麼通過delta的形式去讀取的時候是不會讀取到這些 delete 的數據的。
但是如果是通過parquet的格式去讀取,依舊可以讀取到這些已經被打上delete操作的數據的。
為了完全的進行物理刪除,需要執行Delta 的 vacuum 方法,執行這個方法後,Delta會完全物理刪除已經被打上標籤的 文件 和 目錄。

4.這樣就可以完成Delta的存儲了,不過是最基本的,後面還需要解決小文件合併,表結構修改等問題,就不在這裡講述了。


分享到:


相關文章: