flink多目錄路徑讀取數據源

在使用 flink 讀取數據源時,經常會遇到從多個目錄讀取數據源的問題,例如像下面的 hdfs

路徑:

flink多目錄路徑讀取數據源

如上數據為小時級切割數據,文件夾命名是以:年-月-日-小時 格式,但如果想計算某一天的全部數據,如:2019-11-19 ,那怎麼才能讓 flink 自動讀取所有以 2019-11-19 開頭的目錄文件呢?

如果是使用 spark 開發,其實非常容易,直接傳入模糊匹配的路徑即可:

<code>spark.read.text("/data/2019-11-19*/*")
/<code>

但是在 flink 中卻沒有提供類似的方法,因此需要我們自己來擴展:

<code>// 外層父級目錄
String dir = "hdfs://namenode.yuankan.co/data";

Path path = new Path(dir);
Configuration configuration = new Configuration();
// 設置遞歸獲取文件
configuration.setBoolean("recursive.file.enumeration", true);

TextInputFormat textInputFormat = new TextInputFormat(path);
textInputFormat.supportsMultiPaths();
textInputFormat.configure(configuration);
textInputFormat.setFilesFilter(new FilePathFilter() {
\t@Override
\tpublic boolean filterPath(Path filePath) {
\t\t// 過濾想要的路徑
\t\treturn !filePath.toString().contains("2019-11-19");
\t}
});

env.readFile(textInputFormat,dir)

/<code>

整體思路也非常簡單,利用內置的 readFile 方法,讀取外層目錄文件夾,過濾篩選出需要使用的數據源即可。

博客:https://yuankan.co


分享到:


相關文章: