本文主要介紹Flink接收一個Kafka文本數據流,進行WordCount詞頻統計,然後輸出到標準輸出上。通過本文你可以瞭解如何編寫和運行Flink程序。
代碼拆解
首先要設置Flink的執行環境:
<code>// 創建Flink執行環境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();/<code>
設置Kafka相關參數,連接對應的服務器和端口號,讀取名為Shakespeare的Topic中的數據源,將數據源命名為stream:
<code>// Kafka參數Properties properties = new Properties();properties.setProperty("bootstrap.servers", "localhost:9092");properties.setProperty("group.id", "flink-group");String inputTopic = "Shakespeare";String outputTopic = "WordCount";// SourceFlinkKafkaConsumer<string> consumer = new FlinkKafkaConsumer<string>(inputTopic, new SimpleStringSchema(), properties);DataStream<string> stream = env.addSource(consumer);/<string>/<string>/<string>/<code>
使用Flink算子處理這個數據流:
<code>// Transformations// 使用Flink算子對輸入流的文本進行操作// 按空格切詞、計數、分區、設置時間窗口、聚合DataStream<tuple2>> wordCount = stream .flatMap((String line, Collector<tuple2>> collector) -> { String[] tokens = line.split("\\\\s"); // 輸出結果 (word, 1) for (String token : tokens) { if (token.length() > 0) { collector.collect(new Tuple2<>(token, 1)); } } }) .returns(Types.TUPLE(Types.STRING, Types.INT)) .keyBy(0) .timeWindow(Time.seconds(5)) .sum(1);/<tuple2>/<tuple2>/<code>
這裡使用的是Flink提供的DataStream級別的API,主要包括轉換、分組、窗口和聚合等操作。
將數據流打印:
<code>// SinkwordCount.print();/<code>
最後執行這個程序:
<code>// executeenv.execute("kafka streaming word count");/<code>
env.execute 是啟動Flink作業所必需的,只有在execute()被調用時,之前調用的各個操作才會在提交到集群上或本地計算機上執行。
完整代碼如下:
<code>import org.apache.flink.api.common.serialization.SimpleStringSchema;import org.apache.flink.api.common.typeinfo.Types;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.windowing.time.Time;import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;import org.apache.flink.util.Collector;import java.util.Properties;public class WordCountKafkaInStdOut { public static void main(String[] args) throws Exception { // 創建Flink執行環境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // Kafka參數 Properties properties = new Properties(); properties.setProperty("bootstrap.servers", "localhost:9092"); properties.setProperty("group.id", "flink-group"); String inputTopic = "Shakespeare"; String outputTopic = "WordCount"; // Source FlinkKafkaConsumer<string> consumer = new FlinkKafkaConsumer<string>(inputTopic, new SimpleStringSchema(), properties); DataStream<string> stream = env.addSource(consumer); // Transformations // 使用Flink算子對輸入流的文本進行操作 // 按空格切詞、計數、分區、設置時間窗口、聚合 DataStream<tuple2>> wordCount = stream .flatMap((String line, Collector<tuple2>> collector) -> { String[] tokens = line.split("\\\\s"); // 輸出結果 (word, 1) for (String token : tokens) { if (token.length() > 0) { collector.collect(new Tuple2<>(token, 1)); } } }) .returns(Types.TUPLE(Types.STRING, Types.INT)) .keyBy(0) .timeWindow(Time.seconds(5)) .sum(1); // Sink wordCount.print(); // execute env.execute("kafka streaming word count"); }}/<tuple2>/<tuple2>/<string>/<string>/<string>/<code>
執行程序
我們在 這篇文章中曾提到如何啟動一個Kafka集群,並向某個Topic內發送數據流。在本次Flink作業啟動之前,我們先要按照那篇文章中提到的方式啟動一個Kafka集群,創建對應的Topic,並向Topic中寫入數據。
Intellij Idea調試執行
在IntelliJ Idea中,點擊綠色按鈕,執行這個程序。下圖中任意兩個綠色按鈕都可以啟動程序。
IntelliJ Idea下方會顯示程序中輸出到標準輸出上的內容,包括本次需要打印的結果。
恭喜你,你的第一個Flink程序運行成功!
在集群上提交作業
第一步中我們已經下載並搭建了本地集群,接著我們在模板的基礎上添加了代碼,並可以在IntelliJ Idea中調試運行。在生產環境,一般需要將代碼編譯打包,提交到集群上。
注意,這裡涉及兩個目錄,一個是我們存放我們剛剛編寫代碼的工程目錄,簡稱工程目錄,另一個是從Flink官網下載解壓的Flink主目錄,主目錄下的bin目錄中有Flink提供好的命令行工具。
進入工程目錄,使用Maven命令行編譯打包:
<code># 使用Maven將自己的代碼編譯打包# 打好的包一般放在工程目錄的target子文件夾下$ mvn clean package/<code>
回到剛剛下載解壓的Flink主目錄,使用Flink提供的命令行工具flink,將我們剛剛打包好的作業提交到集群上。命令行的參數--class用來指定哪個主類作為入口。我們之後會介紹命令行的具體使用方法。
<code>$ bin/flink run --class com.flink.tutorials.java.api.projects.wordcount.WordCountKafkaInStdOut /Users/luweizheng/Projects/big-data/flink-tutorials/target/flink-tutorials-0.1.jar/<code>
這時,儀表盤上就多了一個Flink程序。
程序的輸出會打到Flink主目錄下面的log目錄下的.out文件中,使用下面的命令查看結果:
<code>$ tail -f log/flink-*-taskexecutor-*.out/<code>
停止本地集群:
<code>$ ./bin/stop-cluster.sh/<code>
Flink開發和調試過程中,一般有幾種方式執行程序:
- 使用IntelliJ Idea內置的運行按鈕。這種方式主要在本地調試時使用。
- 使用Flink提供的標準命令行工具向集群提交作業,包括Java和Scala程序。這種方式更適合生產環境。
- 使用Flink提供的其他命令行工具,比如針對Scala、Python和SQL的交互式環境。這種方式也是在調試時使用。
閱讀更多 皮皮魯的AI星球 的文章