一行配置作業性能提升53%!Flink SQL 性能之旅

作者|Nico Kruber翻譯|毛家琦校對|伍翀

最近,我們用 SQL 查詢做了一些實驗,這個查詢關聯了一些維表的豐富原始記錄。同時,我們也考慮如果使用 DataStream API 實現相同的任務,是否能夠從現有機器中激發出更多的性能。在本文中,我們想帶你一起看看這是否有可能發生,以及如何實現?我們還會為不同於 PoC 代碼的作業提供進一步的提示,並對未來的工作進行展望。

我們在 Azure Kubernetes 設置中執行了 10 個標準的 v3 實例(每個實例 2 個CPU)、Ververica Platform 2.0、Flink 1.10 以及每個實例單核 8 併發的實驗,這些作為這次試驗的背景[1]。我們將給出在穩定狀態(即 15 分鐘長基準的最後 5 分鐘)期間合併的所有源的平均吞吐量,即 numRecordsOutPerSecond。如下示例的源代碼可以從鏈接位置檢索:

https://github.com/verververica/lab-sql-vs-datastream

SQL 查詢

首先,讓我們看看我們試圖超越的查詢。下面概述的查詢來源於一個流式 SQL 作業中獲得的靈感。它執行來自 fact_table 的輸入流與幾個維表的連接,維表定義如下(維表中的數據為每個最多 100 個字符的隨機字符串):

一行配置作業性能提升53%!Flink SQL 性能之旅

這個作業的重要部分是,所有輸入表都可以被更改;它們作為流被使用。在這種情況下,我們需要確定關聯的是哪個版本的行。這就是 Flink 的 temporal table Join 的使用場景:在執行 join 時,fact_table 中的每一行都應該與來自相匹配維度表的最新行進行連接和合並。如果您進一步研究 temporal joins(通過 LATERAL TABLE 語句和圍繞表 dim_TABLE 1,…,dim_TABLE 5 的包裝時態表函數),會發現 Flink 還支持 event-time 連接,它可能會派上用場。然而,我們為了示例簡單,在這裡就不使用 event-time 了。

<code>SELECT
D1.col1 AS A,
D1.col2 AS B,
D1.col3 AS C,
D1.col4 AS D,
D1.col5 AS E,
D2.col1 AS F,
D2.col2 AS G,
...
D5.col4 AS X,
D5.col5 AS Y
FROM
fact_table,
LATERAL TABLE (dimension_table1(f_proctime)) AS D1,
LATERAL TABLE (dimension_table2(f_proctime)) AS D2,
LATERAL TABLE (dimension_table3(f_proctime)) AS D3,
LATERAL TABLE (dimension_table4(f_proctime)) AS D4,
LATERAL TABLE (dimension_table5(f_proctime)) AS D5
WHERE
fact_table.dim1 = D1.id
AND fact_table.dim2 = D2.id
AND fact_table.dim3 = D3.id
AND fact_table.dim4 = D4.id
AND fact_table.dim5 = D5.id/<code>

LateralTableJoin.java 裡的 DataStream 代碼為每個輸入表創建了一個流式輸入源,並將輸出轉換為一個 append 的 DataStream,然後導入了 DiscardingSink。在 Flink 1.10 中設置此 SQL 作業有兩種方法:使用舊的 Flink Planner 或使用新的 Blink Planner。讓我們看看這兩者有什麼不同。

舊 Flink Planner

舊 Planner 當前(Flink 1.10)是默認使用的,或者可以通過以下:

<code>StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tEnv = StreamTableEnvironment 
.create(env,EnvironmentSettings.newInstance()。useOldPlanner()。build());/<code>

它將 SQL 查詢轉換為以下作業圖,作業圖中的每個節點包含了一些 chain operators,例如將 DataStream source 轉成 table、join 後選出列子集:

一行配置作業性能提升53%!Flink SQL 性能之旅

開箱即用,它提供 84279 個事件/秒的穩定吞吐量。

新 Blink Planner

Flink 的新 Blink Planner 實現了一些增強功能,例如改進的功能集,並且在性能表現上,儘可能地多使用二進制類型,以避免序列化/反序列化開銷。它可以在 StreamTableEnvironment 的初始化期間啟用:

<code>StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

StreamTableEnvironment tEnv = StreamTableEnvironment

.create(env,EnvironmentSettings.newInstance()。useBlinkPlanner()。build());/<code>

創建的作業圖看起來與舊的 Planner 沒有太大的不同點,並且具有類似的概念算子:

一行配置作業性能提升53%!Flink SQL 性能之旅

在這種情況下,只需啟用 Blink Planner,吞吐量就會略微提高到 89048 個事件/秒(+5%)。

對象重用

這兩個 Planner 創建的任務實際上都是由兩個鏈式算子組成的:源附加了某種錶轉換/字段選擇,在(temporal)join 之後附加了字段選擇。如果你還記得 Flink 是如何處理算子之間的數據對象交換的[2],你會注意到,鏈接算子之間的數據傳輸會經過序列化/反序列化/複製階段,以防止在下一個算子中修改對象時意外將其存儲在一個算子中。這種行為不僅會影響批處理程序,還會影響流處理作業,並且可以通過啟用對象重用來更改,這通常是非常危險的,但是在 Flink 的 Table/SQL API 中這是非常安全的:

<code>StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig()。enableObjectReuse();/<code>

從下面的數字可以看出,啟用此開關會略微提高舊 Planner 的性能(+5%),這也可能是正常的結果波動。但這裡遇到的瓶頸可能是其他原因導致的。更重要的是,啟用對象重用可以顯著提高 Blink Planner 的吞吐量(+53%)。通常,使用的 SQL 函數/算子越多,啟用對象重用產生的影響就越大。

一行配置作業性能提升53%!Flink SQL 性能之旅

看到這些數字,您可能會想,為什麼使用新的 Blink planner 啟用對象重用比使用舊的 planner 更能提高性能。其原因在 Flink 的堆棧深處,而且可能與我們運行的查詢有關聯,因為我們在這個查詢中大量使用字符串:

如果沒有對象重用,在舊 Planner 中,同一任務的兩個算子之間的數據交換最終將通過 StringSerializer#copy(String)。在 Blink planner 中,它最終將調用 BinaryString#copy()。如果查看實現代碼,可以發現 StringSerializer#copy(String)可以依賴於 Java 字符串的不變性,因此可以高效的使用和傳遞引用。而 BinaryString#copy()需要複製底層 MemorySegment 的字節。通過啟用對象重用來避免複製,可以有效提升速度。然後刪除 StringSerializer-copy(String)調用可能只會輕微地減少開銷。

接下來,除了尋找 Table API 執行引擎和優化器的調優參數[3]之外,沒有太多當前可以優化的點了。然而,對於給定的工作,這些調整開關似乎都不能保證有進一步的改進。如果嘗試使用 profiler 進行進一步的調查,您將實際看到字符串序列化和反序列化以及 Table API 的二進制數據類型處理對總體性能的影響是最大的,其次是狀態訪問。

對於未來,有想法引入新的 source 和 sink 接口,可以直接處理二進制的數據類型。此外,用戶定義的函數目前正沿著 FLIP-65 方向進行修改,以便它們也可以直接處理二進制數據。這可以使得 UDFs 與內置函數一樣強大。這兩種增強都將進一步減少堆棧中的序列化開銷。

與 DataStream API 同行

我的第一個天真的想法是,我可以很輕易地通過使用 DataStream API 來擊敗這個吞吐量。我可以將相同的 joins 實現在一起,無需 SQL 的任何轉換層。這自然涉及到更多的編碼,於是我從使用 Java 開始。

第一次嘗試

我的第一個草圖大致圍繞這些代碼行展開,FactTable 和 dimensiontable 是與上面的 SQL 作業相同的源函數。你可以在 LateralStreamJoin1 找到它:

<code>StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStream factTableSource = env
.addSource(new FactTable(factTableRate, maxId))
.name("fact_table");
DataStream dimTable1Source = env
.addSource(new DimensionTable(dimTableRate, maxId))
.name("dim_table1");
// ...

DataStream joinedStream =
factTableSource
.keyBy(x -> x.dim1)
.connect(dimTable1Source.keyBy(x -> x.id))
.process(
new AbstractFactDimTableJoin<fact>() {
@Override
Fact1 join(Fact value, Dimension dim) {
return new Fact1(value, dim);
}
})
.name("join1").uid("join1")
.keyBy(x -> x.dim2)
.connect(dimTable2Source.keyBy(x -> x.id))
// ...

joinedStream.addSink(new DiscardingSink<>());
env.execute();/<fact>/<code>

通過合適的 join key 進行 keyBy 後,fact 表與每個維度一個接一個地聯接在一起。在這段代碼中,helper 類 AbstractFactDimTableJoin 實際上按照處理時間進行關聯:它跟蹤 processElement2 中每個鍵的最新維度數據對象,並且在 processElement1 中豐富的每個事實事件,如果有最新的狀態對象出現就會提取它們並填充到實時事件中。這裡的 PoC 將忽略缺少維度數據的事件。

<code>abstract class AbstractFactDimTableJoin extends CoProcessFunction {
protected transient ValueState dimState;

@Override
public void processElement1(IN1 value, Context ctx, Collector out) throws Exception {
Dimension dim = dimState.value();
if (dim == null) {
return;
}
out.collect(join(value, dim));
}

abstract OUT join(IN1 value, Dimension dim);

@Override
public void processElement2(Dimension value, Context ctx, Collector out) throws Exception {
dimState.update(value);
}

@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
ValueStateDescriptor dimStateDesc =
new ValueStateDescriptor<>("dimstate", Dimension.class);
this.dimState = getRuntimeContext().getState(dimStateDesc);
}
}
/<code>

總之,這創建了下面的作業圖,它與上面的 SQL 作業非常相似,但不需要 Table conversion,也不需要選擇相應的鍵—它內置在 join 函數中,該函數使用 Fact 衍生類來代表每次字段的擴展:Fact1、Fact2、…、Fact4、DenormalizedFact。

一行配置作業性能提升53%!Flink SQL 性能之旅

到目前為止,不管有沒有對象重用,我們都能比 Flink 的舊 planner 強 17%。然而,新的 Blink planner 在啟用對象重用之後,每秒就能夠擠出更多的事件。實際上,對象重用不應該對我們的數據流作業產生任何影響,因為沒有鏈式算子;關於下面 4% 的數字差異似乎來自基準測試期間的正常波動。

一行配置作業性能提升53%!Flink SQL 性能之旅

DataStream 作業的性能分析

實際上,我希望有更大的差異顯示,所以讓我們來看下 profiler,看看 CPU 時間花在哪裡。您可以使用任何您喜歡的 profiler;我將顯示來自 JMC 7 的結果(https://jdk.java.net/jmc/),並使用 Java 11 運行 LateralStreamJoin1 來獲得這些結果。如你所見,來自 DataOutputSerializer、StringValue 和反射訪問的序列化開銷超過了實際的業務邏輯開銷,例如 CopyOnWriteStateMap,它正在從 on-heap 狀態或 com.ververica.LateralStreamJoin1 中檢索匹配的維度數據。

一行配置作業性能提升53%!Flink SQL 性能之旅

這結果並不意外,因為(反)序列化通常總體上具有很高的成本,並且所呈現的作業本身沒有(很多)計算量— fact 流中的每個事件實際上只有一個狀態訪問。另一方面,如果我們從上面回憶起作業圖,那麼在 join1、join2、join3、join4 和 join5 中的 dim_table1 中的數據將(反)序列化,這是實際使用它的唯一/第一個位置。這與其他維度表類似並且是我們可以避免的。

減少序列化開銷

為了避免一次又一次地對同一數據進行連續(反)序列化,我們可以在源代碼處序列化一次,然後直接傳遞它,直到它真正被需要為止(例如在 join5 任務中)。使用我們的 source 生成器,最快的方法是添加執行此轉換的 Map 函數:

<code>class MapDimensionToBinary extends RichMapFunction<dimension> {

private transient TypeSerializer dimSerializer;
private transient DataOutputSerializer serializationBuffer;

@Override
public BinaryDimension map(Dimension value) throws Exception {
return BinaryDimension.fromDimension(value, dimSerializer, serializationBuffer);
}

@Override
public void open(Configuration parameters) throws Exception {
serializationBuffer = new DataOutputSerializer(100);
dimSerializer = TypeInformation.of(Dimension.class).createSerializer(getRuntimeContext().getExecutionConfig());
}
}/<dimension>/<code>

BinaryDimension 是一個簡單 POJO,一個 long 表示維度鍵,一個字節數組表示序列化數據。這是我們接下來要傳遞的對象。實際上,如果您查看 LateralStreamJoin2 的代碼,您將看到我們現在改用 Tuple2,…,Tuple5(為了簡單起見),這是為了進一步(微小的)性能提升,因為 Tuple 序列化比 POJO 序列化快一點。因此,作業的主體只輕微地改動了:包括使用了這些額外的 Map 映射,對參與的類型輕微修改了下,以及最後一級 join 之後的反序列化。

<code>StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStream factTableSource = env
.addSource(new FactTable(factTableRate, maxId))
.name("fact_table");
DataStream dimTable1Source = env
.addSource(new DimensionTable(dimTableRate, maxId))
.name("dim_table1");
// ...

DataStream joinedStream =
factTableSource
.keyBy(x -> x.dim1)
.connect(dimTable1Source.map(new MapDimensionToBinary()).keyBy(x -> x.id))
.process(
new AbstractFactDimTableJoin<fact>>() {
@Override
Tuple2<fact> join(Fact value, BinaryDimension dim) {
return Tuple2.of(value, dim.data);
}
})
.name("join1").uid("join1")
// ...
.keyBy(x -> x.f0.dim5)
.connect(dimTable5Source.map(new MapDimensionToBinary()).keyBy(x -> x.id))
.process(
new AbstractFactDimTableJoin<tuple5>, DenormalizedFact>() {
private TypeSerializer dimSerializer;

@Override
DenormalizedFact join(Tuple5<fact> value, BinaryDimension dim)
throws IOException {
DataInputDeserializer deserializerBuffer = new DataInputDeserializer();
deserializerBuffer.setBuffer(value.f1);
Dimension dim1 = dimSerializer.deserialize(deserializerBuffer);
// ...
return new DenormalizedFact(value.f0, dim1, dim2, dim3, dim4, dim5);
}

@Override
public void open(Configuration parameters) throws Exception {
dimSerializer =
TypeInformation.of(Dimension.class)
.createSerializer(getRuntimeContext().getExecutionConfig());
}
})
.name("join5").uid("join5");

joinedStream.addSink(new DiscardingSink<>());
env.execute();/<fact>/<tuple5>/<fact>/<fact>/<code>

儘管技術上不需要此代碼,但我們將聯接表的反序列化保留到 DenormalizedFact 對象。刪除它在比較中是不公平的,甚至在任何實際工作中,您都將繼續使用此數據或重新格式化它以獲得正確的輸出。

使用 profiler 對運行該修改版本的作業做性能分析,可以發現一些不同點:

運行此作業修改版本的探查器快照與其他有很多不同,雖然頂級 ThreadLocal 仍然來自(反)序列化(從 StringValue.readString()調用),但相當多的 CPU 時間變成了實際業務邏輯(例如 CopyOnWriteStateMap)。

一行配置作業性能提升53%!Flink SQL 性能之旅

這額外的效果也反映在我們的基準數值上。未啟用對象重用,優化後的 DataStream 作業現在大約比 Blink Planner 的 SQL join 快 70%。啟用對象重用後,降低了新的 map 算子以及最後一階段(寫入 sink)的開銷,並獲得了 13% 的提升。而 Blink Planner,開啟對象重用後可以獲得 53% 的顯著提升。

最後,我們優化後的 DataStream 作業比 SQL 的最佳成績要快 28%。

一行配置作業性能提升53%!Flink SQL 性能之旅

結論

與 DataStream API 相比,運行此實驗展示了 Blink planner 出色的性能,這給我留下了非常深刻的印象。此外,對 Table API 的進一步改進實際上也可以減少差距,特別是在需要用 DataStream API 橋接 Table API 的地方。使用 DataStream API 進一步提高作業的性能,相較於 Blink planner 來說需要更多對序列化棧的思考,以及對特定作業的微調。

本文所提出的技術基本上依賴於儘可能長時間保持序列化形式的數據(當通過網絡移交時),並且只在特定時候時反序列化它。這可能可以封裝在一些好用的工具方法或基類下,以提高最終代碼的可讀性和可用性。然而,相比於 SQL 的簡單,當您查看優化所需的代碼量時,還必須考慮可維護性和開發成本,從而做出權衡的選擇。更重要的是,如果 Blink planner 在將來引入一個專門的維度關聯,那麼將在底層作用應用這種優化,並且不會用戶對 SQL 查詢本身進行任何更改。

參考鏈接:

[1] https://www.ververica.com/blog/getting-started-with-ververica-platform-on-microsoft-azure-part-2[2] https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/batch/#operating-on-data-objects-in-functions[3] https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/config.html


分享到:


相關文章: