Flink 1.9—SQL 創建 Kafka 數據源

前言

目前 Flink 1.9 SQL 支持用戶直接使用 SQL 語句創建 Kafka 數據源,這極大的方便了用戶開發 Flink 實時任務,你可以像 Hive 一樣,使用 Create Table 語句來創建 Kafka Source,同時在也可以使用 Select 語句,從這個表中讀取數據,進行窗口、ETL等操作。本文主要講解 Flink 1.9 SQL 創建 Kafka 的 SQL 語法使用,當然,使用這個功能的前提,是你選擇使用 Blink Planner。


Flink 1.9—SQL 創建 Kafka 數據源

Flink SQL Kafka Source DDL 語句

首先,一般你的 Kafka 數據源裡面的消息格式為 Json ,這樣在 Flink SQL 創建 Kafka 數據源的時候,指定消息格式為 Json,表中的定義的確保字段的名稱和 Json 中的字段保持一致,下面是 Flink SQL 代碼實例:

<code>create table kafka_topic_src(id varchar,name varchar,age varchar,) with ('connector.type' = 'kafka', 'connector.version' = '0.10','connector.topic' = 'your_topic','connector.properties.0.key' = 'group.id','connector.properties.0.value' = 'your_consumer_id','connector.properties.1.key' = 'bootstrap.servers','connector.properties.1.value' = 'your_bootstrap_servers','connector.property-version' = '1','connector.startup-mode' = 'latest-offset','format.type' = 'json','format.property-version' = '1','format.derive-schema' = 'true','update-mode' = 'append');/<code>

上面的 Flink SQL 語句中,定義了三個字段,id、name、age。所以你的 Json 數據格式要包含這三個字段,如果沒有包含某個字段,Flink 默認會使用 null 進行填充。

當然,你也可以使用 Json 中部分字段進行使用,比如你只需要 Json 中的 id、name,你也可以這樣定義:

<code>create table kafka_topic_src(id varchar,name varchar) with ('connector.type' = 'kafka', 'connector.version' = '0.10','connector.topic' = 'your_topic','connector.properties.0.key' = 'group.id','connector.properties.0.value' = 'your_consumer_id','connector.properties.1.key' = 'bootstrap.servers','connector.properties.1.value' = 'your_bootstrap_servers','connector.property-version' = '1','connector.startup-mode' = 'latest-offset','format.type' = 'json','format.property-version' = '1','format.derive-schema' = 'true','update-mode' = 'append');/<code>

注意,如果你的 kafka 消息不是 Json的話,Flink 任務會一直報錯,目前 Kafka 的 upadte-mode 只支持 append 模式。


Flink 1.9—SQL 創建 Kafka 數據源

Flink SQL Kafka Source DDL 屬性值

  1. connector.topic ,kafka Topic
  2. connector.startup-mode ,Flink kafka 消費者啟動模式
  3. format.type ,kafka 消息內容格式

Flink SQL Kafka Source DDL 注意點

Flink SQL 設置 kafka 消費者 group id

<code>'connector.properties.0.key' = 'group.id','connector.properties.0.value' = 'track.log.teamtype.join' /<code>

這兩個參數一起來進行設置,在 with 後面的語句中。

設置 kafka bootstrap.servers

<code>'connector.properties.1.key' = 'bootstrap.servers','connector.properties.1.value' = 'your_kafka_boots_servers'/<code>

這兩個參數要一起設置,具體的 bootstrap.servers 就是你所使用 Topic 所在集群的鏈接信息。

Flink 1.9—SQL 創建 Kafka 數據源

結語

我是Lake,專注大數據技術原理、人工智能、數據庫技術、程序員經驗分享,如果我的問答對你有幫助的話,希望你能點贊關注我,感謝。

我會持續大數據、數據庫方面的內容,如果你有任何問題,也歡迎關注私信我,我會認真解答每一個問題,期待您的關注

Flink 1.9—SQL 創建 Kafka 數據源


分享到:


相關文章: