Flink的DataSource三部曲之二:内置connector

本文是《Flink的DataSource三部曲》系列的第二篇,上一篇《 》中,学习了StreamExecutionEnvironment的API创建DataSource,今天要练习的是Flink内置的connector,即下图的红框位置,这些connector可以通过StreamExecutionEnvironment的addSource方法使用:

Flink的DataSource三部曲之二:内置connector

今天的实战选择Kafka作为数据源来操作,先尝试接收和处理String型的消息,再接收JSON类型的消息,将JSON反序列化成bean实例;

源码下载

如果您不想写代码,整个系列的源码可在GitHub下载到,地址和链接信息如下表所示:

Flink的DataSource三部曲之二:内置connector

这个git项目中有多个文件夹,本章的应用在flinkdatasourcedemo文件夹下,如下图红框所示:

Flink的DataSource三部曲之二:内置connector

环境和版本

本次实战的环境和版本如下:

  1. JDK:1.8.0_211
  2. Flink:1.9.2
  3. Maven:3.6.0
  4. 操作系统:macOS Catalina 10.15.3 (MacBook Pro 13-inch, 2018)
  5. IDEA:2018.3.5 (Ultimate Edition)
  6. Kafka:2.4.0
  7. Zookeeper:3.5.5

请确保上述内容都已经准备就绪,才能继续后面的实战;

Flink与Kafka版本匹配

  • Flink官方对匹配Kafka版本做了详细说明,地址是:https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html
  • 要重点关注的是官方提到的通用版(universal Kafka connector ),这是从Flink1.7开始推出的,对于Kafka1.0.0或者更高版本都可以使用:
Flink的DataSource三部曲之二:内置connector

  • 下图红框中是我的工程中要依赖的库,蓝框中是连接Kafka用到的类,读者您可以根据自己的Kafka版本在表格中找到适合的库和类:
Flink的DataSource三部曲之二:内置connector

实战字符串消息处理

  • 在kafka上创建名为test001的topic,参考命令:
<code>./kafka-topics.sh \\ 

--create \\
--zookeeper 192.168.50.43:2181 \\
--replication-factor 1 \\
--partitions 2 \\
--topic test001/<code>
  • 继续使用上一章创建的flinkdatasourcedemo工程,打开pom.xml文件增加以下依赖:
<code><dependency>
<groupid>org.apache.flink/<groupid>
<artifactid>flink-connector-kafka_2.11/<artifactid>
<version>1.10.0/<version>
/<dependency>/<code>
  • 新增类Kafka240String.java,作用是连接broker,对收到的字符串消息做WordCount操作:
<code>package com.bolingcavalry.connector;

import com.bolingcavalry.Splitter;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import java.util.Properties;
import static com.sun.tools.doclint.Entity.para;

public class Kafka240String {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//设置并行度
env.setParallelism(2);

Properties properties = new Properties();
//broker地址
properties.setProperty("bootstrap.servers", "192.168.50.43:9092");
//zookeeper地址
properties.setProperty("zookeeper.connect", "192.168.50.43:2181");
//消费者的groupId
properties.setProperty("group.id", "flink-connector");

//实例化Consumer类
FlinkKafkaConsumer<string> flinkKafkaConsumer = new FlinkKafkaConsumer<>(
"test001",
new SimpleStringSchema(),
properties
);
//指定从最新位置开始消费,相当于放弃历史消息
flinkKafkaConsumer.setStartFromLatest();

//通过addSource方法得到DataSource
DataStream<string> dataStream = env.addSource(flinkKafkaConsumer);

//从kafka取得字符串消息后,分割成单词,统计数量,窗口是5秒
dataStream
.flatMap(new Splitter())
.keyBy(0)
.timeWindow(Time.seconds(5))
.sum(1)
.print();

env.execute("Connector DataSource demo : kafka");
}
}/<string>/<string>/<code>

确保kafka的topic已经创建,将Kafka240运行起来,可见消费消息并进行单词统计的功能是正常的:

Flink的DataSource三部曲之二:内置connector

接收kafka字符串消息的实战已经完成,接下来试试JSON格式的消息;

实战JSON消息处理

接下来要接受的JSON格式消息,可以被反序列化成bean实例,会用到JSON库,我选择的是gson;

  • 在pom.xml增加gson依赖:
<code><dependency>
<groupid>com.google.code.gson/<groupid>
<artifactid>gson/<artifactid>
<version>2.8.5/<version>
/<dependency>/<code>
  • 增加类Student.java,这是个普通的Bean,只有id和name两个字段:
<code>package com.bolingcavalry;

public class Student {

private int id;

private String name;

public int getId() {
return id;
}

public void setId(int id) {
this.id = id;
}

public String getName() {
return name;
}


public void setName(String name) {
this.name = name;
}
}/<code>
  • 增加类StudentSchema.java,该类是DeserializationSchema接口的实现,将JSON反序列化成Student实例时用到:
<code>package com.bolingcavalry.connector;

import com.bolingcavalry.Student;
import com.google.gson.Gson;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import java.io.IOException;

public class StudentSchema implements DeserializationSchema<student>, SerializationSchema<student> {

private static final Gson gson = new Gson();

/**
* 反序列化,将byte数组转成Student实例
* @param bytes
* @return
* @throws IOException
*/
@Override
public Student deserialize(byte[] bytes) throws IOException {
return gson.fromJson(new String(bytes), Student.class);
}

@Override
public boolean isEndOfStream(Student student) {
return false;
}

/**
* 序列化,将Student实例转成byte数组
* @param student
* @return
*/
@Override
public byte[] serialize(Student student) {

return new byte[0];
}

@Override
public TypeInformation<student> getProducedType() {
return TypeInformation.of(Student.class);
}
}/<student>/<student>/<student>/<code>

新增类Kafka240Bean.java,作用是连接broker,对收到的JSON消息转成Student实例,统计每个名字出现的数量,窗口依旧是5秒:

<code>package com.bolingcavalry.connector;

import com.bolingcavalry.Splitter;
import com.bolingcavalry.Student;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import java.util.Properties;

public class Kafka240Bean {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//设置并行度
env.setParallelism(2);

Properties properties = new Properties();
//broker地址
properties.setProperty("bootstrap.servers", "192.168.50.43:9092");
//zookeeper地址
properties.setProperty("zookeeper.connect", "192.168.50.43:2181");
//消费者的groupId
properties.setProperty("group.id", "flink-connector");
//实例化Consumer类
FlinkKafkaConsumer<student> flinkKafkaConsumer = new FlinkKafkaConsumer<>(
"test001",
new StudentSchema(),
properties
);
//指定从最新位置开始消费,相当于放弃历史消息

flinkKafkaConsumer.setStartFromLatest();

//通过addSource方法得到DataSource
DataStream<student> dataStream = env.addSource(flinkKafkaConsumer);

//从kafka取得的JSON被反序列化成Student实例,统计每个name的数量,窗口是5秒
dataStream.map(new MapFunction<student>>() {
@Override
public Tuple2<string> map(Student student) throws Exception {
return new Tuple2<>(student.getName(), 1);
}
})
.keyBy(0)
.timeWindow(Time.seconds(5))
.sum(1)
.print();

env.execute("Connector DataSource demo : kafka bean");
}
}/<string>/<student>/<student>/<student>/<code>
  • 在测试的时候,要向kafka发送JSON格式字符串,flink这边就会给统计出每个name的数量:
Flink的DataSource三部曲之二:内置connector

至此,内置connector的实战就完成了,接下来的章节,我们将要一起实战自定义DataSource;


Flink的DataSource三部曲之二:内置connector


分享到:


相關文章: