Flink入門:讀取Kafka實時數據流,實現WordCount

本文主要介紹Flink接收一個Kafka文本數據流,進行WordCount詞頻統計,然後輸出到標準輸出上。通過本文你可以瞭解如何編寫和運行Flink程序。

代碼拆解

首先要設置Flink的執行環境:

<code>// 創建Flink執行環境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();/<code>

設置Kafka相關參數,連接對應的服務器和端口號,讀取名為Shakespeare的Topic中的數據源,將數據源命名為stream:

<code>// Kafka參數Properties properties = new Properties();properties.setProperty("bootstrap.servers", "localhost:9092");properties.setProperty("group.id", "flink-group");String inputTopic = "Shakespeare";String outputTopic = "WordCount";// SourceFlinkKafkaConsumer<string> consumer =                new FlinkKafkaConsumer<string>(inputTopic, new SimpleStringSchema(), properties);DataStream<string> stream = env.addSource(consumer);/<string>/<string>/<string>/<code>

使用Flink算子處理這個數據流:

<code>// Transformations// 使用Flink算子對輸入流的文本進行操作// 按空格切詞、計數、分區、設置時間窗口、聚合DataStream<tuple2>> wordCount = stream    .flatMap((String line, Collector<tuple2>> collector) -> {      String[] tokens = line.split("\\\\s");      // 輸出結果 (word, 1)      for (String token : tokens) {        if (token.length() > 0) {          collector.collect(new Tuple2<>(token, 1));        }      }    })    .returns(Types.TUPLE(Types.STRING, Types.INT))    .keyBy(0)    .timeWindow(Time.seconds(5))    .sum(1);/<tuple2>/<tuple2>/<code>

這裡使用的是Flink提供的DataStream級別的API,主要包括轉換、分組、窗口和聚合等操作。

將數據流打印:

<code>// SinkwordCount.print();/<code>

最後執行這個程序:

<code>// executeenv.execute("kafka streaming word count");/<code>

env.execute 是啟動Flink作業所必需的,只有在execute()被調用時,之前調用的各個操作才會在提交到集群上或本地計算機上執行。

完整代碼如下:

<code>import org.apache.flink.api.common.serialization.SimpleStringSchema;import org.apache.flink.api.common.typeinfo.Types;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.windowing.time.Time;import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;import org.apache.flink.util.Collector;import java.util.Properties;public class WordCountKafkaInStdOut {    public static void main(String[] args) throws Exception {        // 創建Flink執行環境        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();        // Kafka參數        Properties properties = new Properties();        properties.setProperty("bootstrap.servers", "localhost:9092");        properties.setProperty("group.id", "flink-group");        String inputTopic = "Shakespeare";        String outputTopic = "WordCount";        // Source        FlinkKafkaConsumer<string> consumer =                new FlinkKafkaConsumer<string>(inputTopic, new SimpleStringSchema(), properties);        DataStream<string> stream = env.addSource(consumer);        // Transformations        // 使用Flink算子對輸入流的文本進行操作        // 按空格切詞、計數、分區、設置時間窗口、聚合        DataStream<tuple2>> wordCount = stream            .flatMap((String line, Collector<tuple2>> collector) -> {                String[] tokens = line.split("\\\\s");                // 輸出結果 (word, 1)                for (String token : tokens) {                    if (token.length() > 0) {                        collector.collect(new Tuple2<>(token, 1));                    }                }            })            .returns(Types.TUPLE(Types.STRING, Types.INT))            .keyBy(0)            .timeWindow(Time.seconds(5))            .sum(1);        // Sink        wordCount.print();        // execute        env.execute("kafka streaming word count");    }}/<tuple2>/<tuple2>/<string>/<string>/<string>/<code>

執行程序

我們在 這篇文章中曾提到如何啟動一個Kafka集群,並向某個Topic內發送數據流。在本次Flink作業啟動之前,我們先要按照那篇文章中提到的方式啟動一個Kafka集群,創建對應的Topic,並向Topic中寫入數據。

Intellij Idea調試執行

在IntelliJ Idea中,點擊綠色按鈕,執行這個程序。下圖中任意兩個綠色按鈕都可以啟動程序。

Flink入門:讀取Kafka實時數據流,實現WordCount

IntelliJ Idea下方會顯示程序中輸出到標準輸出上的內容,包括本次需要打印的結果。

Flink入門:讀取Kafka實時數據流,實現WordCount

恭喜你,你的第一個Flink程序運行成功!

在集群上提交作業

第一步中我們已經下載並搭建了本地集群,接著我們在模板的基礎上添加了代碼,並可以在IntelliJ Idea中調試運行。在生產環境,一般需要將代碼編譯打包,提交到集群上。

注意,這裡涉及兩個目錄,一個是我們存放我們剛剛編寫代碼的工程目錄,簡稱工程目錄,另一個是從Flink官網下載解壓的Flink主目錄,主目錄下的bin目錄中有Flink提供好的命令行工具。

進入工程目錄,使用Maven命令行編譯打包:

<code># 使用Maven將自己的代碼編譯打包# 打好的包一般放在工程目錄的target子文件夾下$ mvn clean package/<code>

回到剛剛下載解壓的Flink主目錄,使用Flink提供的命令行工具flink,將我們剛剛打包好的作業提交到集群上。命令行的參數--class用來指定哪個主類作為入口。我們之後會介紹命令行的具體使用方法。

<code>$ bin/flink run --class com.flink.tutorials.java.api.projects.wordcount.WordCountKafkaInStdOut /Users/luweizheng/Projects/big-data/flink-tutorials/target/flink-tutorials-0.1.jar/<code> 

這時,儀表盤上就多了一個Flink程序。

Flink入門:讀取Kafka實時數據流,實現WordCount

程序的輸出會打到Flink主目錄下面的log目錄下的.out文件中,使用下面的命令查看結果:

<code>$ tail -f log/flink-*-taskexecutor-*.out/<code>

停止本地集群:

<code>$ ./bin/stop-cluster.sh/<code>

Flink開發和調試過程中,一般有幾種方式執行程序:

  1. 使用IntelliJ Idea內置的運行按鈕。這種方式主要在本地調試時使用。
  2. 使用Flink提供的標準命令行工具向集群提交作業,包括Java和Scala程序。這種方式更適合生產環境。
  3. 使用Flink提供的其他命令行工具,比如針對Scala、Python和SQL的交互式環境。這種方式也是在調試時使用。


分享到:


相關文章: