Flink 1.9—SQL 创建 Kafka 数据源

前言

目前 Flink 1.9 SQL 支持用户直接使用 SQL 语句创建 Kafka 数据源,这极大的方便了用户开发 Flink 实时任务,你可以像 Hive 一样,使用 Create Table 语句来创建 Kafka Source,同时在也可以使用 Select 语句,从这个表中读取数据,进行窗口、ETL等操作。本文主要讲解 Flink 1.9 SQL 创建 Kafka 的 SQL 语法使用,当然,使用这个功能的前提,是你选择使用 Blink Planner。


Flink 1.9—SQL 创建 Kafka 数据源

Flink SQL Kafka Source DDL 语句

首先,一般你的 Kafka 数据源里面的消息格式为 Json ,这样在 Flink SQL 创建 Kafka 数据源的时候,指定消息格式为 Json,表中的定义的确保字段的名称和 Json 中的字段保持一致,下面是 Flink SQL 代码实例:

<code>create table kafka_topic_src(id varchar,name varchar,age varchar,) with ('connector.type' = 'kafka', 'connector.version' = '0.10','connector.topic' = 'your_topic','connector.properties.0.key' = 'group.id','connector.properties.0.value' = 'your_consumer_id','connector.properties.1.key' = 'bootstrap.servers','connector.properties.1.value' = 'your_bootstrap_servers','connector.property-version' = '1','connector.startup-mode' = 'latest-offset','format.type' = 'json','format.property-version' = '1','format.derive-schema' = 'true','update-mode' = 'append');/<code>

上面的 Flink SQL 语句中,定义了三个字段,id、name、age。所以你的 Json 数据格式要包含这三个字段,如果没有包含某个字段,Flink 默认会使用 null 进行填充。

当然,你也可以使用 Json 中部分字段进行使用,比如你只需要 Json 中的 id、name,你也可以这样定义:

<code>create table kafka_topic_src(id varchar,name varchar) with ('connector.type' = 'kafka', 'connector.version' = '0.10','connector.topic' = 'your_topic','connector.properties.0.key' = 'group.id','connector.properties.0.value' = 'your_consumer_id','connector.properties.1.key' = 'bootstrap.servers','connector.properties.1.value' = 'your_bootstrap_servers','connector.property-version' = '1','connector.startup-mode' = 'latest-offset','format.type' = 'json','format.property-version' = '1','format.derive-schema' = 'true','update-mode' = 'append');/<code>

注意,如果你的 kafka 消息不是 Json的话,Flink 任务会一直报错,目前 Kafka 的 upadte-mode 只支持 append 模式。


Flink 1.9—SQL 创建 Kafka 数据源

Flink SQL Kafka Source DDL 属性值

  1. connector.topic ,kafka Topic
  2. connector.startup-mode ,Flink kafka 消费者启动模式
  3. format.type ,kafka 消息内容格式

Flink SQL Kafka Source DDL 注意点

Flink SQL 设置 kafka 消费者 group id

<code>'connector.properties.0.key' = 'group.id','connector.properties.0.value' = 'track.log.teamtype.join' /<code>

这两个参数一起来进行设置,在 with 后面的语句中。

设置 kafka bootstrap.servers

<code>'connector.properties.1.key' = 'bootstrap.servers','connector.properties.1.value' = 'your_kafka_boots_servers'/<code>

这两个参数要一起设置,具体的 bootstrap.servers 就是你所使用 Topic 所在集群的链接信息。

Flink 1.9—SQL 创建 Kafka 数据源

结语

我是Lake,专注大数据技术原理、人工智能、数据库技术、程序员经验分享,如果我的问答对你有帮助的话,希望你能点赞关注我,感谢。

我会持续大数据、数据库方面的内容,如果你有任何问题,也欢迎关注私信我,我会认真解答每一个问题,期待您的关注

Flink 1.9—SQL 创建 Kafka 数据源


分享到:


相關文章: