之前說到了給出了每次生成的 sql ,現在講下大體的佈局。
第一個 processor 生成文件,文件裡含有屬性,設定的屬性有:
columns : 表的字段
dbTable : 表的名稱
delayMinute : 延遲的分鐘
startTime : 開始時間
endTime : 結束時間
level : 是否有效。
等這個文件生成後,就到了下一個 processor 上。這個控件就是 RouteOnAttribute ,任務就是過濾掉 level = -1 的無效文件。
下一個 processor 就是 UpdateAttribute ,這個控件的作用是增加 屬性。新增一個屬性:
database.name :就是設定數據庫的名稱。
再接下來就是分流了,把不同表生成的文件分到不同的方向上去,依舊需要 RouteOnAttribute 。
內部的設置是
這個樣子,根據 dbTable 這個屬性進行分流。
分完流之後,再經過一個 UpdateAttribute ,內部可以設置自己表中的 hint 等相關信息。
最終就到了 ExecuteSQL 這個控件。這個控件就是去數據庫查詢數據,返回 Avro 格式的文件。
內部配置的 sql 就是
<code>select ${hint} ${columns} from ${dbTable} g
where ${timeColumn}>=to_date('${startTime}','yyyy-MM-dd HH24:mi:ss')
and ${timeColumn}< to_date('${endTime}','yyyy-MM-dd HH24:mi:ss')/<code>
那麼數據得到後,需要分區,因為數據存儲到 hdfs 上,需要對數據進行分區,不然所有的數據都放在一起加載很浪費。那麼此時就需要 PartitionRecord 這個控件,
這個控件這麼配置,就會根據文件內容中的 PT1 這個字段進行分類。
例如 文件中 有2行記錄,第一行記錄中的 PT1 是 2020-02-11 ,第二行的記錄中的 PT1 是 2020-02-12 ,那麼經過 PartitionRecord 之後就會變成兩個文件,因為根據 PT1 切分了,而且每個文件的屬性裡又添加了 partitionColumn 這個值。一個文件裡的值的內容就是 2020-02-11 ,另一個文件裡的值的內容就是 2020-02-12 。
接下來就修改文件名,因為 hdfs 上文件名一模一樣的不能寫,會有問題的。
然後通過 PutParquet 將結果寫入進去就大功告成了。
指定動態目錄 Directory , 這樣每個文件進來都會寫到各自的目錄上去,不會互相影響。
閱讀更多 豆豆杭哲 的文章