Flink番外篇~流與表的相互轉換

流處理可以產生很多的商業價值。許多組織都意識到了實時管理大量數據、快速響應以及為客戶提供大規模實時服務的好處,而業務邏輯明確的流應用程序則可以提供更多的競爭優勢。

Flink DataStream抽象是一個功能強大的API,該API允許您定義基本的、複雜的流式管道。

此外,它還提供了像 Async IO、ProcessFunctions這樣的底層操作。但對大部分用戶來說,這些深層次的底層API是用不到的,他們真正需要的是能解決80%用例且只需少數代碼就可搞定的API。

為了向更多用戶提供強大的流處理功能,Apache Flink社區開發了一套抽象更簡單,語法更簡潔的API,以便用戶可以專注於他們的業務邏輯(而非高級流概念)。

與其他API(例如:用來處理流中複雜事件的CEP)一起,Flink還提供了一個可統一批流處理的關係API:Table & SQL API(通常稱為Table API)。

最近,來自Alibaba,Huawei,data Artisans公司的貢獻者們決定進一步開發Table API。在過去的一年中,Table API已被完全重寫。從Flink 1.1開始,其核心就改為了Apache Calcite,後者能解析處理SQL、優化所有關係查詢。今天,Table API可以在批處理和流環境中使用統一語義來處理各種用例。

本文總結了Flink Table API的當前狀態,並展示了Apache Flink最近添加的一些功能。這裡介紹的功能包括統一訪問批、流數據,數據轉換,以及窗口算子。

下面的段落不僅提供了Table API的一般概述,還可說明將來關係API的潛力。

由於Table API是基於Flink核心API來構建的,因此DataStreams/DataSets和Table可以相互轉換(轉換過程不會產生太多開銷)。

我們還將展示:如何從不同的源創建表,並指定可以在本地或分佈式環境中執行的程序。

本文,雖然我們用的是Scala版本的Table API,但對於Java來說也說一樣的(具有等價功能的SQL API)。

Data Transformation and ETL

數據處理管道的常見任務是從一個或多個系統導入數據,執行一些轉換,然後再將轉換後的數據導出到另一個系統。

Table API可以幫助管理這些重複任務。 對於讀取數據,API提供了一組即用型TableSource,例如CsvTableSource和KafkaTableSource。如果這些即用型TableSource不能滿足您的要求,您可以實現自定義TableSource,這些TableSource可以隱藏流概念中那些陌生的配置細節(例如水印生成)。

假設我們有一個存儲了客戶信息的CSV文件,其值由“|”字符分隔,內容包含客戶標識符,名稱,上次更新的時間戳,以及以逗號分隔的偏好鍵值對:

42|Bob Smith|2016-07-23 16:10:11|color=12,length=200,size=200

下面的示例展示了在將文件數據轉換為常規DataStream程序前,如何利用Flink來讀取CSV文件並執行某些數據清理。

// 設置執行環境
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env)
// 配置table source
val customerSource = CsvTableSource.builder()
.path("/path/to/customer_data.csv")
.ignoreFirstLine()
.fieldDelimiter("|")
.field("id", Types.LONG)
.field("name", Types.STRING)
.field("last_update", Types.TIMESTAMP)
.field("prefs", Types.STRING)
.build()
// 命名table source
tEnv.registerTableSource("customers", customerSource)
// 定義table程序
val table = tEnv
.scan("customers")
.filter('name.isNotNull && 'last_update > "2016-01-01 00:00:00".toTimestamp)
.select('id, 'name.lowerCase(), 'prefs)
// 轉換為data stream
val ds = table.toDataStream[Row]
ds.print()
env.execute()

Table API附帶了大量的內置函數,通過這些函數,我們可以輕鬆地使用語言集成查詢語法(LINQ)來定義業務邏輯。

在上面的例子中,我們先過濾了無效用戶名,然後再查找最近更新了用戶偏好的記錄。同時,為了對數據進行規範化,我們還將姓名轉換為了小寫。

出於調試,這裡我們只將table轉換為了DataStream ,並將結果打印到了標準輸出中。

CsvTableSource既支持批處理環境,又支持流處理環境。如果程序員想在批處理程序中執行上述程序,那麼只需要將執行環境替換為ExecutionEnvironment,並將DataStream修改為DataSet即可。 Table API 程序本身是不需要改動的。

在上面的示例中,我們將table程序轉換為了Row對象的數據流。但,這並意味著只能用row數據類型。Table API支持底層API中的所有類型,例如: Java/Scala中的Tuples, Case Classes, POJOs,或能通過Kryo序列化的泛型類型。

假設我們不想要通用的row數據類型,而是想返回一個帶有下面格式的POJO類型:

class Customer {
var id: Int = _
var name: String = _
var update: Long = _
var prefs: java.util.Properties = _
}

要實現上述需求, 我們可在table程序中將CSV文件轉換成Customer對象。Flink會自動為我們創建對象和映射字段。

val ds = tEnv
.scan("customers")
.select('id, 'name, 'last_update as 'update, parseProperties('prefs) as 'prefs)
.toDataStream[Customer]

或許你已經注意到了,在上面的例子中,我們使用了一個函數來解析偏好字段。儘管Flink Table API 附帶了大量的內置函數,但通常還得為特定業務實現自定義的標量函數(scalar functions)。

在上面的例子中,我們使用了一個自定義函數parseProperties。下面的代碼片段就是該函數的實現。

object parseProperties extends ScalarFunction {
def eval(str: String): Properties = {
val props = new Properties()
str
.split(",")
.map(\\_.split("="))
.foreach(split => props.setProperty(split(0), split(1)))
props
}
}

標量函數(Scalar functions)可用來執行反序列化,抽取,或轉換等操作。通過覆蓋open()方法,我們甚至可以訪問運行時信息(例如:分佈緩存文件或指標)。注意:open()方法只會在運行時的任務生命週期中調用一次。

靜態和流式數據的統一窗口

另一種常見任務(特別是在處理連續數據時)是將流分割成有限大小的窗口來進行計算。

當前,Table API支持三種不同類型的窗口:滑動窗口(sliding windows),翻滾窗口(tumbling windows),以及會話窗口(session windows)。有關這三種窗口類型的定義,可參考Flink文檔。

這三種窗口都可基於事件時間或處理時間來處理事件。會話窗口可按時間間隔定義,滑動窗口和翻滾窗口可按時間間隔或行數來定義。

假設上面示例中的數據是客戶更新其偏好時產生的更新事件流,並假設TableSource中的所有事件都已正確地分配了時間戳和水印。

下面我們將再次使用LINQ風格來定義窗口。下面的示例會算出1天內更新的偏好次數。

table
.window(Tumble over 1.day on 'rowtime as 'w)
.groupBy('id, 'w)
.select('id, 'w.start as 'from, 'w.end as 'to, 'prefs.count as 'updates)

通過on()參數,我們可以指定窗口是否需按事件時間來處理。Table API假設在使用事件時間時已經正確地分配了時間戳和水印。時間戳小於最後接收水印的元素會被刪除。

由於時間戳的提取和水印的生成依賴於數據源,因此通常要由TableSource或上游DataStream來負責分配這些屬性。

下面的代碼展示瞭如何定義其它類型的窗口:

// 使用處理時間
table.window(Tumble over 100.rows as 'manyRowWindow)
// 使用事件時間
table.window(Session withGap 15.minutes on 'rowtime as 'sessionWindow)
table.window(Slide over 1.day every 1.hour on 'rowtime as 'dailyWindow)

由於批處理只是流處理的一種特例(批處理有明確的起點和終點), 因此這些窗口在批處理環境中也同樣適用。由於我們指定了一個名為“rowtime”的列,因此可在不修改table程序的情況下直接基於DataSet來運行上述代碼。這一點對於精確計算相當有用,因為它可以處理那些嚴重無序到達的延遲事件。

目前,Table API僅支持DataStream API中的“分組窗口(group windows)”。 其他窗口,如SQL的OVER條件窗口將在Flink 1.3中實現。

為了展示API的強大功能,下面的代碼片斷會展示一個高級示例:它會在一小時的滑動窗口上衰減移動平均值,並每秒返回一次聚合結果。在這個table程序中,最新訂單的權重比老訂單更重。這個例子是從Apache Calcite中借來的,它展示了未來Flink版本中Table API和SQL的可能性。

table
.window(Slide over 1.hour every 1.second as 'w)
.groupBy('productId, 'w)
.select(
'w.end,
'productId,
('unitPrice * ('rowtime - 'w.start).exp() / 1.hour).sum / (('rowtime - 'w.start).exp() / 1.hour).sum)

用戶自定義Table函數

用戶自定義表函數是Flink 1.2中加入的新功能。 該功能對於那些包含非原子值的表列(在處理之前,需將這些非原子值提取和映射為單獨字段)非常有用。表函數可接受任意數目的標量值作為輸入,並可返回任意數目的行作為輸出(不僅僅輸出單個值),這一點類似於DataStream/DataSet API中的flatMap函數。之後,還可使用左外連接或交叉連接將表函數的輸出與表中的原始行連接。

還是使用前面提到的客戶表,假設現在我們想要生成一個包含顏色和大小偏好的表作為單獨的列。

那麼表程序看起來會像下面這樣:

// 創建一個表函數實例
val extractPrefs = new PropertiesExtractor()
// 派生行,並將它們與原始行進行連接
table
.join(extractPrefs('prefs) as ('color, 'size))
.select('id, 'username, 'color, 'size)

PropertiesExtractor是一個可提取顏色和大小的用戶自定義表函數。我們只對包含這兩個偏好的用戶感興趣,因此如果字符串值中沒有這兩個屬性,那麼不會發出任何內容。由於表程序使用的是交叉連接,因此它會過濾掉連接右側沒有結果的客戶。

class PropertiesExtractor extends TableFunction[Row] {
def eval(prefs: String): Unit = {
// 將string拆分成(key, value)對
val pairs = prefs
.split(",")
.map { kv =>
val split = kv.split("=")
(split(0), split(1))
}
val color = pairs.find(\\_.\\_1 == "color").map(\\_.\\_2)
val size = pairs.find(\\_.\\_1 == "size").map(\\_.\\_2)
//如果指定了顏色和大小,則發出一行數據
(color, size) match {
case (Some(c), Some(s)) => collect(Row.of(c, s))
case _ => // skip
}
}
override def getResultType = new RowTypeInfo(Types.STRING, Types.STRING)
}

結論

當下,人們對易於訪問和使用的流更感興趣。 雖然Flink Table API還在不斷髮展,但我們相信很快您就能使用純關係API來實現批流傳輸,甚至將現有的Flink作業轉換為表程序。

Table API已是一個非常有用的工具,因此您可在DataSet/DataStream抽象和Table抽象之間來回切換來應對限制和缺少功能。

支持Apache Hive UDF,外部目錄,更多TableSource,更多窗口,以及更多算子等功能將使Table API成為更有用的工具。 尤其是,即將推出的動態表格,它表明即使在2017年,新的關係API也打開了許多可能性大門。

原文:https://flink.apache.org/news/2017/03/29/table-sql-api-update.html


分享到:


相關文章: