前言
目前 Flink 1.9 SQL 支持用戶直接使用 SQL 語句創建 Kafka 數據源,這極大的方便了用戶開發 Flink 實時任務,你可以像 Hive 一樣,使用 Create Table 語句來創建 Kafka Source,同時在也可以使用 Select 語句,從這個表中讀取數據,進行窗口、ETL等操作。本文主要講解 Flink 1.9 SQL 創建 Kafka 的 SQL 語法使用,當然,使用這個功能的前提,是你選擇使用 Blink Planner。
Flink SQL Kafka Source DDL 語句
首先,一般你的 Kafka 數據源裡面的消息格式為 Json ,這樣在 Flink SQL 創建 Kafka 數據源的時候,指定消息格式為 Json,表中的定義的確保字段的名稱和 Json 中的字段保持一致,下面是 Flink SQL 代碼實例:
<code>create table kafka_topic_src
(
id varchar,
name varchar,
age varchar,
) with (
'connector.type' = 'kafka',
'connector.version' = '0.10',
'connector.topic' = 'your_topic',
'connector.properties.0.key' = 'group.id',
'connector.properties.0.value' = 'your_consumer_id',
'connector.properties.1.key' = 'bootstrap.servers',
'connector.properties.1.value' = 'your_bootstrap_servers',
'connector.property-version' = '1',
'connector.startup-mode' = 'latest-offset',
'format.type' = 'json',
'format.property-version' = '1',
'format.derive-schema' = 'true',
'update-mode' = 'append');/<code>
上面的 Flink SQL 語句中,定義了三個字段,id、name、age。所以你的 Json 數據格式要包含這三個字段,如果沒有包含某個字段,Flink 默認會使用 null 進行填充。
當然,你也可以使用 Json 中部分字段進行使用,比如你只需要 Json 中的 id、name,你也可以這樣定義:
<code>create table kafka_topic_src
(
id varchar,
name varchar
) with (
'connector.type' = 'kafka',
'connector.version' = '0.10',
'connector.topic' = 'your_topic',
'connector.properties.0.key' = 'group.id',
'connector.properties.0.value' = 'your_consumer_id',
'connector.properties.1.key' = 'bootstrap.servers',
'connector.properties.1.value' = 'your_bootstrap_servers',
'connector.property-version' = '1',
'connector.startup-mode' = 'latest-offset',
'format.type' = 'json',
'format.property-version' = '1',
'format.derive-schema' = 'true',
'update-mode' = 'append');/<code>
注意,如果你的 kafka 消息不是 Json的話,Flink 任務會一直報錯,目前 Kafka 的 upadte-mode 只支持 append 模式。
Flink SQL Kafka Source DDL 屬性值
- connector.topic ,kafka Topic
- connector.startup-mode ,Flink kafka 消費者啟動模式
- format.type ,kafka 消息內容格式
Flink SQL Kafka Source DDL 注意點
Flink SQL 設置 kafka 消費者 group id
<code>'connector.properties.0.key' = 'group.id',
'connector.properties.0.value' = 'track.log.teamtype.join' /<code>
這兩個參數一起來進行設置,在 with 後面的語句中。
設置 kafka bootstrap.servers
<code>'connector.properties.1.key' = 'bootstrap.servers',
'connector.properties.1.value' = 'your_kafka_boots_servers'/<code>
這兩個參數要一起設置,具體的 bootstrap.servers 就是你所使用 Topic 所在集群的鏈接信息。
結語
我是Lake,專注大數據技術原理、人工智能、數據庫技術、程序員經驗分享,如果我的問答對你有幫助的話,希望你能點贊關注我,感謝。
我會持續大數據、數據庫方面的內容,如果你有任何問題,也歡迎關注私信我,我會認真解答每一個問題,期待您的關注
閱讀更多 Lake說科技 的文章