初識實時流處理Storm,掌握其編程模型、核心概念、數據處理流程

Storm入門秘籍

0、需求

為了分析廣告投放的效果,為了計算投放廣告的收益,我們需要實時收集用戶的請求日誌,曝光日誌,點擊日誌,我們需要一個實時計算平臺。

Storm是開源的、分佈式、流式計算系統。

什麼是分佈式呢?就是將一個任務拆解給多個計算機去執行,讓許多機器共通完成同一個任務,把這個多機的細節給屏蔽,對外提供同一個接口、同一個服務,這樣的系統就是分佈式系統,如:storm集群、redis集群、mysql集群等等。

本篇文章,我們僅介紹storm的快速入門,基礎案例,後續我們會分享:storm的集群架構、並行度、可靠性、在廣告系統中的應用

一、引言

大數據一直是近年的熱點話題,隨著數據量的急速增長,數據處理的規模也從GB 級別增長到TB 級別,很多圖像應用領域已經開始處理PB 級別的數據分析。大數據的核心目標是提升業務的競爭力,找到一些可以採取行動的洞察(Actionable Insight),數據分析就是其中的核心技術,包括數據收集、處理、建模和分析,最後找到改進業務的方案。

最近一兩年,隨著大數據分析需求的爆炸性增長,很多公司都經歷過將以關係型商用數據庫為基礎的數據平臺,轉移到一些開源生態的大數據平臺,例如Hadoop 或Spark 平臺,以可控的軟硬件成本處理更大的數據量。Hadoop 設計之初就是為了批量處理大數據,但數據處理實時性經常是它的弱點。例如,很多時候一個MapReduce 腳本的執行,很難估計需要多長時間才能完成,無法滿足很多數據分析師所期望的秒級返回查詢結果的分析需求。

為了解決數據實時性的問題,大部分公司都有一個經歷,將數據分析變成更加實時的可交互方案。其中,涉及新軟件的引入、數據流的改進等。數據分析的幾種常見方法如下圖。

初識實時流處理Storm,掌握其編程模型、核心概念、數據處理流程

整個數據分析的基礎架構通常分為以下幾類。

  • 使用Hadoop/Spark 的MR 分析。
  • 將Hadoop/Spark 的結果注入RDBMS 中提供實時分析。
  • 將結果注入到容量更大的NoSQL 中,例如HBase 等。
  • 將數據源進行流式處理,對接流式計算框架,如Storm,結果落在RDBMS/NoSQL 中。
  • 將數據源進行流式處理,對接分析數據庫,例如Druid、Vertica 等。

之前我們講過了Hadoop基礎實例,這篇文章,我們重點講下storm的基礎入門,為後續進行日誌分析做好理論鋪墊。歡迎持續關注本頭條號,後面精彩不容錯過……也會講解druid

初識實時流處理Storm,掌握其編程模型、核心概念、數據處理流程

二、編程模型&數據處理流程圖

1、編程模型

初識實時流處理Storm,掌握其編程模型、核心概念、數據處理流程

如圖所示:這樣的一個Topology(拓撲),在Storm中,就稱為用戶的一個作業)。這個拓撲包含了許多的節點,以及這些節點之間的邊。這些點有兩種:數據源節點(Spout)、計算節點(Bolt),點之間的邊稱為數據流(Stream),數據流又由很多Tuple組成。

在圖中這個Topology裡面,我們看到了兩個Spout和5個Bolt,在實際運行的時候,每個Spout節點都可能有很多個實例,每個Bolt也有可能有很多個實例,下篇文章我們會詳細介紹下storm的並行度。

2、數據處理流程圖

結合上面的變成模型,便有了下面的數據處理流程圖:

初識實時流處理Storm,掌握其編程模型、核心概念、數據處理流程

每一個“水龍頭”表示一個Spout,它會不間斷地發送一些Tuple給下游的Bolt,這些Bolt經過處理,再發送一個Tuple給下一個Bolt,最後,在這些Bolt裡面是可以執行一些寫數據到外部存儲(如數據庫)等操作的。

3、專業術語概覽

1、Topology

實時應用程序的邏輯被封裝在 Storm topology(拓撲)中. Storm topology(拓撲)類似於 MapReduce 作業. 兩者之間關鍵的區別是 MapReduce 作業最終會完成, 而 topology(拓撲)任務會永遠運行(除非 kill 掉它). 一個拓撲是 Spout 和 Bolt 通過 stream groupings 連接起來的有向無環圖.

2:Stream

拓撲中的消息流,傳輸的對象是Tuple,由一系列連續的 Tuple 組成。

stream 是 Storm 中的核心概念.一個 stream 是一個無界的、以分佈式方式並行創建和處理的 Tuple 序列. 默認情況下 Tuple 可以包含 integers, longs, shorts, bytes, strings, doubles, floats, booleans, and byte arrays 等數據類型.你也可以定義自己的 serializers, 以至於可以在 Tuple 中使用自定義的類型.

每一個流在聲明的時候會賦予一個 ID. 由於只包含一個 stream 的 Spout 和 Bolt 比較常見, OutputFieldsDeclarer有更方便的方法可以定義一個單一的 stream 而不用指定ID. 這個 stream 被賦予一個默認的 ID, "default".

3:Spout

Spout 是一個 topology(拓撲)中 streams 的源頭. 通常 Spout 會從外部數據源讀取 Tuple,然後把他們發送到拓撲中(如 Kestel 隊列, 或者 Twitter API).

4:Bolt

Storm編程模型中的處理組件,定義execute方法進行實際的數據邏輯處理。拓撲中所有的業務處理都在 Bolts 中完成. Bolt 可以做很多事情,過濾, 函數, 聚合, 關聯, 與數據庫交互等.

Bolt 可以做簡單 stream 轉換. 複雜的 stream 轉換一般需要多個步驟).

5:Tuple

一次消息傳遞的基本單元

6:Stream Groupings數據流分組策略

topology(拓撲)定義中有一部分是為每一個 bolt 指定輸入的 streams . stream grouping 定義了stream 如何在 Bolts tasks 之間分區.

Bolt 和 Sport 都是多線程的,數據在流動的時候,在多個線程中決定流向哪一個線程 就需要Stream Groupings。有些類似SQL中的Group By,用來制定這些計算是怎麼分組的。

Shuffle Grouping:隨機分組,保證bolt接受的tuple數據相同。

Fileds Grouping:按字段分組,相同tuple會分到一個bolt中。同一個單詞會指定到同一個線程裡。

Global Grouping:全局分組,所有tuple發送給task_id最小的bolt。

三、流處理編程結構介紹

其實,幾乎都是標準模板結構:我們往裡套就OK了

1、spout

在 Storm 的結構中 spout 承擔了消息的生成功能。常用方法如下:

/**
* Called when a task for this component is initialized within a worker on the cluster.
* It provides the spout with the environment in which the spout executes.
*/
void open(Map conf, TopologyContext context, SpoutOutputCollector collector);

/**
* When this method is called, Storm is requesting that the Spout emit tuples to the
* output collector. This method should be non-blocking, so if the Spout has no tuples
* to emit, this method should return. nextTuple, ack, and fail are all called in a tight
* loop in a single thread in the spout task. When there are no tuples to emit, it is courteous
* to have nextTuple sleep for a short amount of time (like a single millisecond)
* so as not to waste too much CPU.
*/
void nextTuple();

/**
* Declare the output schema for all the streams of this topology.
*/
void declareOutputFields(OutputFieldsDeclarer declarer);

上面是原文介紹,這裡做些簡單總結:

  • 第一個被調用的 spout 方法都是open函數,主要做一下初始化的操作。它接收如下參數:配置對象,在定義topology 對象是創建;TopologyContext 對象,包含所有拓撲數據;還有SpoutOutputCollector 對象,它能讓我們發佈交給 bolts 處理的數據。
  • Spout 中的最主要的方法是 nextTuple,nextTuple函數將在一個 while(ture) 中被調用。 nextTuple 要麼向 topology(拓撲)中發送一個新的 Tuple, 要麼在沒有 Tuple 需要發送的情況下直接返回. 對於任何 Spout 實現, nextTuple 方法都必須非阻塞的, 因為 Storm 在一個線程中調用所有的 Spout 方法.
  • 通過設置declareOutputFields函數,Spouts 可以 emit 多個 stream。OutputFieldsDeclarer: 用於聲明 streams 和它的 schemas。

2、bolt

Bolt 是基本的消息處理單元,可以處理所有的邏輯操作。常用方法如下:

/**
* Called when a task for this component is initialized within a worker on the cluster.
* It provides the bolt with the environment in which the bolt executes.
*/
void prepare(Map stormConf, TopologyContext context, OutputCollector collector);

/**
* Process a single tuple of input. The Tuple object contains metadata on it
* about which component/stream/task it came from. The values of the Tuple can
* be accessed using Tuple#getValue. The IBolt does not have to process the Tuple
* immediately. It is perfectly fine to hang onto a tuple and process it later
* (for instance, to do an aggregation or join).
*
* Tuples should be emitted using the OutputCollector provided through the prepare method.
* It is required that all input tuples are acked or failed at some point using the OutputCollector.
* Otherwise, Storm will be unable to determine when tuples coming off the spouts
* have been completed.
*
* For the common case of acking an input tuple at the end of the execute method,
* see IBasicBolt which automates this.
*/
void execute(Tuple input);

/**
* Declare the output schema for all the streams of this topology. @param declarer this is used to declare output stream ids, output fields, and whether or not each output stream is a direct stream

*/
void declareOutputFields(OutputFieldsDeclarer declarer);

上面是原文介紹,這裡做些簡單總結:

  • prepare函數做些初始化操作
  • _bolt_最重要的方法是void execute(Tuple input),每次接收到元組時都會被調用一次,還會再發布若干個元組。

四、Storm基礎實例

代碼下載地址:https://gitee.com/jikeh/JiKeHCN-RELEASE.git

項目名:spring-boot-storm

1、求和:1+2+3……n

1)topology架構圖:

初識實時流處理Storm,掌握其編程模型、核心概念、數據處理流程

2)代碼實現

2、wordcount:單詞計數

源數據:

welcome to visit our website jikeh and take a look at our premium courses
welcome to visit our/>welcome to visit our toutiao and take a look at our articles

1)topology架構圖:

初識實時流處理Storm,掌握其編程模型、核心概念、數據處理流程

2)代碼實現


分享到:


相關文章: