本文是《Flink的sink實戰》系列的第三篇,主要內容是體驗Flink官方的cassandra connector,整個實戰如下圖所示,我們先從kafka獲取字符串,再執行wordcount操作,然後將結果同時打印和寫入cassandra:
前文鏈接
軟件版本
本次實戰的軟件版本信息如下:
- cassandra:3.11.6
- kafka:2.4.0(scala:2.12)
- jdk:1.8.0_191
- flink:1.9.2
- maven:3.6.0
- flink所在操作系統:CentOS Linux release 7.7.1908
- cassandra所在操作系統:CentOS Linux release 7.7.1908
- IDEA:2018.3.5 (Ultimate Edition)
關於cassandra
本次用到的cassandra是三臺集群部署的集群,如果您想快速部署集群可以參考《ansible快速部署cassandra3集群》
準備cassandra的keyspace和表
先創建keyspace和table:
- cqlsh登錄cassandra:
<code>cqlsh 192.168.133.168/<code>
- 創建keyspace(3副本):
<code>CREATE KEYSPACE IF NOT EXISTS example WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '3'};/<code>
- 建表:
<code>CREATE TABLE IF NOT EXISTS example.wordcount ( word text, count bigint, PRIMARY KEY(word) );/<code>
準備kafka的topic
- 啟動kafka服務;
- 創建名為test001的topic,參考命令如下:
<code> ./kafka-topics.sh \ --create \ --bootstrap-server 127.0.0.1:9092 \ --replication-factor 1 \ --partitions 1 \ --topic test001/<code>
- 進入發送消息的會話模式,參考命令如下:
<code>./kafka-console-producer.sh \ --broker-list kafka:9092 \ --topic test001/<code>
- 在會話模式下,輸入任意字符串然後回車,都會將字符串消息發送到broker;
源碼下載
如果您不想寫代碼,整個系列的源碼可在GitHub下載到,地址和鏈接信息如下表所示(https://github.com/zq2599/blog_demos):
這個git項目中有多個文件夾,本章的應用在flinksinkdemo文件夾下,如下圖紅框所示:
兩種寫入cassandra的方式
flink官方的connector支持兩種方式寫入cassandra:
- Tuple類型寫入:將Tuple對象的字段對齊到指定的SQL的參數中;
- POJO類型寫入:通過DataStax,將POJO對象對應到註解配置的表和字段中;
接下來分別使用這兩種方式;
開發(Tuple寫入)
- 在pom.xml中增加casandra的connector依賴:
<code> org.apache.flink flink-connector-cassandra_2.11 1.10.0 /<code>
- 另外還要添加flink-streaming-scala依賴,否則編譯CassandraSink.addSink這段代碼會失敗:
<code> org.apache.flink flink-streaming-scala_${scala.binary.version} ${flink.version} provided /<code>
- 新增CassandraTuple2Sink.java,這就是Job類,裡面從kafka獲取字符串消息,然後轉成Tuple2類型的數據集寫入cassandra,寫入的關鍵點是Tuple內容和指定SQL中的參數的匹配:
<code>package com.bolingcavalry.addsink; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.connectors.cassandra.CassandraSink; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import org.apache.flink.util.Collector; import java.util.Properties; public class CassandraTuple2Sink { public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //設置並行度 env.setParallelism(1); //連接kafka用到的屬性對象 Properties properties = new Properties(); //broker地址 properties.setProperty("bootstrap.servers", "192.168.50.43:9092"); //zookeeper地址 properties.setProperty("zookeeper.connect", "192.168.50.43:2181"); //消費者的groupId properties.setProperty("group.id", "flink-connector"); //實例化Consumer類 FlinkKafkaConsumer flinkKafkaConsumer = new FlinkKafkaConsumer<>( "test001", new SimpleStringSchema(), properties ); //指定從最新位置開始消費,相當於放棄歷史消息 flinkKafkaConsumer.setStartFromLatest(); //通過addSource方法得到DataSource DataStream dataStream = env.addSource(flinkKafkaConsumer); DataStream> result = dataStream .flatMap(new FlatMapFunction>() { @Override public void flatMap(String value, Collector> out) { String[] words = value.toLowerCase().split("\\s"); for (String word : words) { //cassandra的表中,每個word都是主鍵,因此不能為空 if (!word.isEmpty()) { out.collect(new Tuple2(word, 1L)); } } } } ) .keyBy(0) .timeWindow(Time.seconds(5)) .sum(1); result.addSink(new PrintSinkFunction<>()) .name("print Sink") .disableChaining(); CassandraSink.addSink(result) .setQuery("INSERT INTO example.wordcount(word, count) values (?, ?);") .setHost("192.168.133.168") .build() .name("cassandra Sink") .disableChaining(); env.execute("kafka-2.4 source, cassandra-3.11.6 sink, tuple2"); } }/<code>
- 上述代碼中,從kafka取得數據,做了word count處理後寫入到cassandra,注意addSink方法後的一連串API(包含了數據庫連接的參數),這是flink官方推薦的操作,另外為了在Flink web UI看清楚DAG情況,這裡調用disableChaining方法取消了operator chain,生產環境中這一行可以去掉;
- 編碼完成後,執行mvn clean package -U -DskipTests構建,在target目錄得到文件flinksinkdemo-1.0-SNAPSHOT.jar;
- 在Flink的web UI上傳flinksinkdemo-1.0-SNAPSHOT.jar,並指定執行類,如下圖紅框所示:
- 啟動任務後DAG如下:
- 去前面創建的發送kafka消息的會話模式窗口,發送一個字符串"aaa bbb ccc aaa aaa aaa";
- 查看cassandra數據,發現已經新增了三條記錄,內容符合預期:
- 查看TaskManager控制檯輸出,裡面有Tuple2數據集的打印結果,和cassandra的一致:
- DAG上所有SubTask的記錄數也符合預期:
開發(POJO寫入)
接下來嘗試POJO寫入,即業務邏輯中的數據結構實例被寫入cassandra,無需指定SQL:
- 實現POJO寫入數據庫,需要datastax庫的支持,在pom.xml中增加以下依賴:
<code> com.datastax.cassandra cassandra-driver-core 3.1.4 shaded io.netty * /<code>
- 請注意上面配置的exclusions節點,依賴datastax的時候,按照官方指導對netty相關的間接依賴做排除,官方地址:https://docs.datastax.com/en/developer/java-driver/3.1/manual/shaded_jar/
- 創建帶有數據庫相關注解的實體類WordCount:
<code>package com.bolingcavalry.addsink; import com.datastax.driver.mapping.annotations.Column; import com.datastax.driver.mapping.annotations.Table; @Table(keyspace = "example", name = "wordcount") public class WordCount { @Column(name = "word") private String word = ""; @Column(name = "count") private long count = 0; public WordCount() { } public WordCount(String word, long count) { this.setWord(word); this.setCount(count); } public String getWord() { return word; } public void setWord(String word) { this.word = word; } public long getCount() { return count; } public void setCount(long count) { this.count = count; } @Override public String toString() { return getWord() + " : " + getCount(); } }/<code>
- 然後創建任務類CassandraPojoSink:
<code>package com.bolingcavalry.addsink; import com.datastax.driver.mapping.Mapper; import com.datastax.shaded.netty.util.Recycler; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.connectors.cassandra.CassandraSink; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import org.apache.flink.util.Collector; import java.util.Properties; public class CassandraPojoSink { public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //設置並行度 env.setParallelism(1); //連接kafka用到的屬性對象 Properties properties = new Properties(); //broker地址 properties.setProperty("bootstrap.servers", "192.168.50.43:9092"); //zookeeper地址 properties.setProperty("zookeeper.connect", "192.168.50.43:2181"); //消費者的groupId properties.setProperty("group.id", "flink-connector"); //實例化Consumer類 FlinkKafkaConsumer flinkKafkaConsumer = new FlinkKafkaConsumer<>( "test001", new SimpleStringSchema(), properties ); //指定從最新位置開始消費,相當於放棄歷史消息 flinkKafkaConsumer.setStartFromLatest(); //通過addSource方法得到DataSource DataStream dataStream = env.addSource(flinkKafkaConsumer); DataStream result = dataStream .flatMap(new FlatMapFunction() { @Override public void flatMap(String s, Collector collector) throws Exception { String[] words = s.toLowerCase().split("\\s"); for (String word : words) { if (!word.isEmpty()) { //cassandra的表中,每個word都是主鍵,因此不能為空 collector.collect(new WordCount(word, 1L)); } } } }) .keyBy("word") .timeWindow(Time.seconds(5)) .reduce(new ReduceFunction() { @Override public WordCount reduce(WordCount wordCount, WordCount t1) throws Exception { return new WordCount(wordCount.getWord(), wordCount.getCount() + t1.getCount()); } }); result.addSink(new PrintSinkFunction<>()) .name("print Sink") .disableChaining(); CassandraSink.addSink(result) .setHost("192.168.133.168") .setMapperOptions(() -> new Mapper.Option[] { Mapper.Option.saveNullFields(true) }) .build() .name("cassandra Sink") .disableChaining(); env.execute("kafka-2.4 source, cassandra-3.11.6 sink, pojo"); } }/<code>
- 從上述代碼可見,和前面的Tuple寫入類型有很大差別,為了準備好POJO類型的數據集,除了flatMap的匿名類入參要改寫,還要寫好reduce方法的匿名類入參,並且還要調用setMapperOptions設置映射規則;
- 編譯構建後,上傳jar到flink,並且指定任務類為CassandraPojoSink:
- 清理之前的數據,在cassandra的cqlsh上執行TRUNCATE example.wordcount;
- 像之前那樣發送字符串消息到kafka:
- 查看數據庫,發現結果符合預期:
- DAG和SubTask情況如下:
至此,flink的結果數據寫入cassandra的實戰就完成了,希望能給您一些參考;