Nifi 部署分鐘級別同步數據(二)

之前說到了給出了每次生成的 sql ,現在講下大體的佈局。


Nifi 部署分鐘級別同步數據(二)

總體nifi佈局


第一個 processor 生成文件,文件裡含有屬性,設定的屬性有:

columns : 表的字段

dbTable : 表的名稱

delayMinute : 延遲的分鐘

startTime : 開始時間

endTime : 結束時間

level : 是否有效。

等這個文件生成後,就到了下一個 processor 上。這個控件就是 RouteOnAttribute ,任務就是過濾掉 level = -1 的無效文件。

下一個 processor 就是 UpdateAttribute ,這個控件的作用是增加 屬性。新增一個屬性:

database.name :就是設定數據庫的名稱。

再接下來就是分流了,把不同表生成的文件分到不同的方向上去,依舊需要 RouteOnAttribute 。

內部的設置是


Nifi 部署分鐘級別同步數據(二)

這個樣子,根據 dbTable 這個屬性進行分流。

分完流之後,再經過一個 UpdateAttribute ,內部可以設置自己表中的 hint 等相關信息。

最終就到了 ExecuteSQL 這個控件。這個控件就是去數據庫查詢數據,返回 Avro 格式的文件。

內部配置的 sql 就是

<code>select ${hint} ${columns} from ${dbTable} g
where ${timeColumn}>=to_date('${startTime}','yyyy-MM-dd HH24:mi:ss')
and ${timeColumn}< to_date('${endTime}','yyyy-MM-dd HH24:mi:ss')/<code>

那麼數據得到後,需要分區,因為數據存儲到 hdfs 上,需要對數據進行分區,不然所有的數據都放在一起加載很浪費。那麼此時就需要 PartitionRecord 這個控件,


Nifi 部署分鐘級別同步數據(二)

這個控件這麼配置,就會根據文件內容中的 PT1 這個字段進行分類。

例如 文件中 有2行記錄,第一行記錄中的 PT1 是 2020-02-11 ,第二行的記錄中的 PT1 是 2020-02-12 ,那麼經過 PartitionRecord 之後就會變成兩個文件,因為根據 PT1 切分了,而且每個文件的屬性裡又添加了 partitionColumn 這個值。一個文件裡的值的內容就是 2020-02-11 ,另一個文件裡的值的內容就是 2020-02-12 。

接下來就修改文件名,因為 hdfs 上文件名一模一樣的不能寫,會有問題的。

然後通過 PutParquet 將結果寫入進去就大功告成了。


Nifi 部署分鐘級別同步數據(二)

指定動態目錄 Directory , 這樣每個文件進來都會寫到各自的目錄上去,不會互相影響。


分享到:


相關文章: