Storm入門秘籍
0、需求
為了分析廣告投放的效果,為了計算投放廣告的收益,我們需要實時收集用戶的請求日誌,曝光日誌,點擊日誌,我們需要一個實時計算平臺。
Storm是開源的、分佈式、流式計算系統。
什麼是分佈式呢?就是將一個任務拆解給多個計算機去執行,讓許多機器共通完成同一個任務,把這個多機的細節給屏蔽,對外提供同一個接口、同一個服務,這樣的系統就是分佈式系統,如:storm集群、redis集群、mysql集群等等。
本篇文章,我們僅介紹storm的快速入門,基礎案例,後續我們會分享:storm的集群架構、並行度、可靠性、在廣告系統中的應用
一、引言
大數據一直是近年的熱點話題,隨著數據量的急速增長,數據處理的規模也從GB 級別增長到TB 級別,很多圖像應用領域已經開始處理PB 級別的數據分析。大數據的核心目標是提升業務的競爭力,找到一些可以採取行動的洞察(Actionable Insight),數據分析就是其中的核心技術,包括數據收集、處理、建模和分析,最後找到改進業務的方案。
最近一兩年,隨著大數據分析需求的爆炸性增長,很多公司都經歷過將以關係型商用數據庫為基礎的數據平臺,轉移到一些開源生態的大數據平臺,例如Hadoop 或Spark 平臺,以可控的軟硬件成本處理更大的數據量。Hadoop 設計之初就是為了批量處理大數據,但數據處理實時性經常是它的弱點。例如,很多時候一個MapReduce 腳本的執行,很難估計需要多長時間才能完成,無法滿足很多數據分析師所期望的秒級返回查詢結果的分析需求。
為了解決數據實時性的問題,大部分公司都有一個經歷,將數據分析變成更加實時的可交互方案。其中,涉及新軟件的引入、數據流的改進等。數據分析的幾種常見方法如下圖。
![初識實時流處理Storm,掌握其編程模型、核心概念、數據處理流程](http://p2.ttnews.xyz/loading.gif)
整個數據分析的基礎架構通常分為以下幾類。
- 使用Hadoop/Spark 的MR 分析。
- 將Hadoop/Spark 的結果注入RDBMS 中提供實時分析。
- 將結果注入到容量更大的NoSQL 中,例如HBase 等。
- 將數據源進行流式處理,對接流式計算框架,如Storm,結果落在RDBMS/NoSQL 中。
- 將數據源進行流式處理,對接分析數據庫,例如Druid、Vertica 等。
之前我們講過了Hadoop基礎實例,這篇文章,我們重點講下storm的基礎入門,為後續進行日誌分析做好理論鋪墊。歡迎持續關注本頭條號,後面精彩不容錯過……也會講解druid
![初識實時流處理Storm,掌握其編程模型、核心概念、數據處理流程](http://p2.ttnews.xyz/loading.gif)
二、編程模型&數據處理流程圖
1、編程模型
如圖所示:這樣的一個Topology(拓撲),在Storm中,就稱為用戶的一個作業)。這個拓撲包含了許多的節點,以及這些節點之間的邊。這些點有兩種:數據源節點(Spout)、計算節點(Bolt),點之間的邊稱為數據流(Stream),數據流又由很多Tuple組成。
在圖中這個Topology裡面,我們看到了兩個Spout和5個Bolt,在實際運行的時候,每個Spout節點都可能有很多個實例,每個Bolt也有可能有很多個實例,下篇文章我們會詳細介紹下storm的並行度。
2、數據處理流程圖
結合上面的變成模型,便有了下面的數據處理流程圖:
每一個“水龍頭”表示一個Spout,它會不間斷地發送一些Tuple給下游的Bolt,這些Bolt經過處理,再發送一個Tuple給下一個Bolt,最後,在這些Bolt裡面是可以執行一些寫數據到外部存儲(如數據庫)等操作的。
3、專業術語概覽
1、Topology
實時應用程序的邏輯被封裝在 Storm topology(拓撲)中. Storm topology(拓撲)類似於 MapReduce 作業. 兩者之間關鍵的區別是 MapReduce 作業最終會完成, 而 topology(拓撲)任務會永遠運行(除非 kill 掉它). 一個拓撲是 Spout 和 Bolt 通過 stream groupings 連接起來的有向無環圖.
2:Stream
拓撲中的消息流,傳輸的對象是Tuple,由一系列連續的 Tuple 組成。
stream 是 Storm 中的核心概念.一個 stream 是一個無界的、以分佈式方式並行創建和處理的 Tuple 序列. 默認情況下 Tuple 可以包含 integers, longs, shorts, bytes, strings, doubles, floats, booleans, and byte arrays 等數據類型.你也可以定義自己的 serializers, 以至於可以在 Tuple 中使用自定義的類型.
每一個流在聲明的時候會賦予一個 ID. 由於只包含一個 stream 的 Spout 和 Bolt 比較常見, OutputFieldsDeclarer有更方便的方法可以定義一個單一的 stream 而不用指定ID. 這個 stream 被賦予一個默認的 ID, "default".
3:Spout
Spout 是一個 topology(拓撲)中 streams 的源頭. 通常 Spout 會從外部數據源讀取 Tuple,然後把他們發送到拓撲中(如 Kestel 隊列, 或者 Twitter API).
4:Bolt
Storm編程模型中的處理組件,定義execute方法進行實際的數據邏輯處理。拓撲中所有的業務處理都在 Bolts 中完成. Bolt 可以做很多事情,過濾, 函數, 聚合, 關聯, 與數據庫交互等.
Bolt 可以做簡單 stream 轉換. 複雜的 stream 轉換一般需要多個步驟).
5:Tuple
一次消息傳遞的基本單元
6:Stream Groupings數據流分組策略
topology(拓撲)定義中有一部分是為每一個 bolt 指定輸入的 streams . stream grouping 定義了stream 如何在 Bolts tasks 之間分區.
Bolt 和 Sport 都是多線程的,數據在流動的時候,在多個線程中決定流向哪一個線程 就需要Stream Groupings。有些類似SQL中的Group By,用來制定這些計算是怎麼分組的。
Shuffle Grouping:隨機分組,保證bolt接受的tuple數據相同。
Fileds Grouping:按字段分組,相同tuple會分到一個bolt中。同一個單詞會指定到同一個線程裡。
Global Grouping:全局分組,所有tuple發送給task_id最小的bolt。
三、流處理編程結構介紹
其實,幾乎都是標準模板結構:我們往裡套就OK了
1、spout
在 Storm 的結構中 spout 承擔了消息的生成功能。常用方法如下:
/**
* Called when a task for this component is initialized within a worker on the cluster.
* It provides the spout with the environment in which the spout executes.
*/
void open(Map conf, TopologyContext context, SpoutOutputCollector collector);
/**
* When this method is called, Storm is requesting that the Spout emit tuples to the
* output collector. This method should be non-blocking, so if the Spout has no tuples
* to emit, this method should return. nextTuple, ack, and fail are all called in a tight
* loop in a single thread in the spout task. When there are no tuples to emit, it is courteous
* to have nextTuple sleep for a short amount of time (like a single millisecond)
* so as not to waste too much CPU.
*/
void nextTuple();
/**
* Declare the output schema for all the streams of this topology.
*/
void declareOutputFields(OutputFieldsDeclarer declarer);
上面是原文介紹,這裡做些簡單總結:
- 第一個被調用的 spout 方法都是open函數,主要做一下初始化的操作。它接收如下參數:配置對象,在定義topology 對象是創建;TopologyContext 對象,包含所有拓撲數據;還有SpoutOutputCollector 對象,它能讓我們發佈交給 bolts 處理的數據。
- Spout 中的最主要的方法是 nextTuple,nextTuple函數將在一個 while(ture) 中被調用。 nextTuple 要麼向 topology(拓撲)中發送一個新的 Tuple, 要麼在沒有 Tuple 需要發送的情況下直接返回. 對於任何 Spout 實現, nextTuple 方法都必須非阻塞的, 因為 Storm 在一個線程中調用所有的 Spout 方法.
- 通過設置declareOutputFields函數,Spouts 可以 emit 多個 stream。OutputFieldsDeclarer: 用於聲明 streams 和它的 schemas。
2、bolt
Bolt 是基本的消息處理單元,可以處理所有的邏輯操作。常用方法如下:
/**
* Called when a task for this component is initialized within a worker on the cluster.
* It provides the bolt with the environment in which the bolt executes.
*/
void prepare(Map stormConf, TopologyContext context, OutputCollector collector);
/**
* Process a single tuple of input. The Tuple object contains metadata on it
* about which component/stream/task it came from. The values of the Tuple can
* be accessed using Tuple#getValue. The IBolt does not have to process the Tuple
* immediately. It is perfectly fine to hang onto a tuple and process it later
* (for instance, to do an aggregation or join).
*
* Tuples should be emitted using the OutputCollector provided through the prepare method.
* It is required that all input tuples are acked or failed at some point using the OutputCollector.
* Otherwise, Storm will be unable to determine when tuples coming off the spouts
* have been completed.
*
* For the common case of acking an input tuple at the end of the execute method,
* see IBasicBolt which automates this.
*/
void execute(Tuple input);
/**
* Declare the output schema for all the streams of this topology. @param declarer this is used to declare output stream ids, output fields, and whether or not each output stream is a direct stream
*/
void declareOutputFields(OutputFieldsDeclarer declarer);
上面是原文介紹,這裡做些簡單總結:
- prepare函數做些初始化操作
- _bolt_最重要的方法是void execute(Tuple input),每次接收到元組時都會被調用一次,還會再發布若干個元組。
四、Storm基礎實例
代碼下載地址:https://gitee.com/jikeh/JiKeHCN-RELEASE.git
項目名:spring-boot-storm
1、求和:1+2+3……n
1)topology架構圖:
2)代碼實現
2、wordcount:單詞計數
源數據:
welcome to visit our website jikeh and take a look at our premium courses
welcome to visit our/>welcome to visit our toutiao and take a look at our articles
1)topology架構圖:
2)代碼實現
閱讀更多 極客慧 的文章