Storm流分組策略(Stream Groupings)實戰案例講解

一、什麼是Stream Groupings(數據流分組策略)

topology(拓撲)定義中有一部分是為每一個 bolt 指定輸入的 streams . stream grouping 定義了stream 如何在 Bolts tasks 之間分區.

Bolt 和 Sport 都是多線程的,數據在流動的時候,在多個線程中決定流向哪一個線程 就需要Stream Groupings。有些類似SQL中的Group By,用來制定這些計算是怎麼分組的。

Storm流分組策略(Stream Groupings)實戰案例講解

我們主要講三種比較常見的數據流分組策略

  • Shuffle grouping: Tuple 隨機的分發到 Bolt Task, 每個 Bolt 獲取到等量的 Tuple.
  • Fields grouping: streams 通過 grouping 指定的字段來分區. 例如:流通過 "user-id" 字段分區, 具有相同 "user-id" 的 Tuple 會發送到同一個task, 不同 "user-id" 的 Tuple 可能會流入到不同的 tasks.
  • Global grouping: 整個 stream 會進入 Bolt 其中一個任務.特別指出, 它會進入 id 最小的 task.

實戰演練分組策略

我們還是基於我們之前的代碼來演示:

代碼下載地址:https://gitee.com/jikeh/JiKeHCN-RELEASE.git

項目名:spring-boot-storm

1、Shuffle grouping

  • 作業:NumberSumTopology
  • 核心代碼:
System.out.println("Thread id: " + Thread.currentThread().getId() + " , rece data is : " + value);

topologyBuilder.setBolt("sum_bolt_id", new NumberSumbolt(), 2).shuffleGrouping("number_spout_id");
  • 提交我們的作業

$STORM_HOME/bin/storm jar /usr/local/src/spring-boot-storm-0.0.1-SNAPSHOT-jar-with-dependencies.jar com.jikeh.Topology.NumberSumTopology

  • 分析結果:

2個線程依次在執行打印結果

2、Fields grouping

  • 作業:NumberSumTopology

核心代碼:

this.collector.emit(new Values(i%2, ++i));
declarer.declare(new Fields("flag","num"));

topologyBuilder.setBolt("sum_bolt_id", new NumberSumbolt(), 3).fieldsGrouping("number_spout_id", new Fields("flag"));

提交我們的作業:

$STORM_HOME/bin/storm jar /usr/local/src/spring-boot-storm-0.0.1-SNAPSHOT-jar-with-dependencies.jar com.jikeh.Topology.NumberSumTopology

分析結果:奇數、偶數分別出現在兩個不同的線程中

  • 作業:WordCountTopology

測試數據:

welcome to visit our TouTiao Number 極客慧
welcome to visit our TouTiao Number 極客慧

提交我們的作業:

$STORM_HOME/bin/storm jar /usr/local/src/spring-boot-storm-0.0.1-SNAPSHOT-jar-with-dependencies.jar com.jikeh.Topology.WordCountTopology

核心代碼:

topologyBuilder.setBolt("count_bolt_id", new WordCountbolt(), 2).fieldsGrouping("split_bolt_id", new Fields("words"));

分析結果:相同的單詞出現在同一個線程中

  • 拓展

問:如果tuple的分類數 < 線程數,會出現什麼情況呢?

一個線程會浪費,即:存在資源浪費情況

問:如果tuple的分類數 > 線程數,又會出現什麼情況呢?

數據並不會丟失,而是都會發送到一個線程裡面

3、Global grouping

有些知識,文字描述起來不太方便,如果你理解上有難度,歡迎觀看視頻教程:


分享到:


相關文章: