本文從兩個方面介紹了Storm是如何保證可靠性的:進程級別、消息級別。最後,我們會通過一個實例來說明,在真實業務場景中,我們應該怎麼做,才能保證Storm的可靠性。
一、Storm Daemon(守護進程)的可靠性與容錯性
Storm 有幾個不同的守護進程,如:調度 workers 的 Nimbus、啟動和殺死 workers 的 supervisors、可以訪問日誌的 log viewer(日誌查看器)以及顯示集群狀態 UI等. 我們看下面幾種場景:
- 當一個 node(節點)掛掉時會發生什麼?
分配給該機器的 task(任務)將超時, Nimbus 將這些 task(任務)重新分配給其他機器.
- 當一個 worker 掛掉時會發生什麼?
當一個 worker 掛掉時, supervisor 將會重啟它. 如果在重啟它時繼續發生故障並且沒有發送 hearbeat(心跳)給 Nimbus, 那麼 Nimbus 將會重新調度 worker,即分配到其它機器上進行啟動.
- 當 Nimbus 或 Supervisor 掛掉時會發生什麼?
- Nimbus 和 Supervisor 守護進程是為 fail-fast(快速失敗)(遇到任何意外情況時進程自毀)和無狀態(所有狀態都保存在 Zookeeper 或磁盤上)而設計的. 像 部署 Storm 集群 中描述的一樣, Nimbus 和 Supervisor 守護進程必須使用 daemontools 或 monit 工具進行監督. 所以如果 Nimbus 或 Supervisor 守護進程掛掉後, 它們會像什麼都沒發生一樣重啟.
最值得注意的是, 沒有 worker 進程受到 Nimbus 或 Supervisors 掛掉的影響. 這與 Hadoop 相反, 如果 JobTracker 掛掉, 所有正在運行的 job 作業都將丟失.
- Nimbus 是單點故障的嗎?
不是。
如果你失去了 Nimbus 節點, workers 仍然會繼續工作. 此外,即使workers掛掉了, supervisors也會繼續重新啟動 workers. 但是,如果沒有 Nimbus,worker 在必要時不會重新分配給其他機器(如失去 worker 機器).
Storm Nimbus 自 1.0.0 以來是 highly available(高可用的),即:有備節點.
二、保證消息處理的可靠性(ack和fail機制)
1、原理介紹
storm的可靠性是由spout和bolt共同決定的,storm利用了ack和fail機制來保證消息處理的可靠性。
- 如果spout發射的一個tuple被完全處理,那麼spout的ack方法即會被調用,如果失敗,則其fail方法便會被調用。
一個tuple及其子tuple共同構成了一個tupletree,當這個tree中所有tuple在指定時間內都完成時,spout的ack才會被調用,但是,當tree中任何一個tuple失敗時,spout的fail方法都會被調用。
- 在bolt中,如果處理tuple成功,則調用bolt的ack方法,如果失敗,則調用其fail方法。
2、具體代碼實現
代碼下載地址:https://gitee.com/jikeh/JiKeHCN-RELEASE.git
項目名:spring-boot-storm
- spout修改
nextTuple():
//tuple的唯一編號 msgId,可以根據你實際的業務場景來定義這個msgId
UUID msgId = UUID. randomUUID();
this.collector.emit(new Values(i++), msgId);
新增兩個方法ack()、fail():
/**
* 處理成功被調用:
* @param msgId
*/
@Override
public void ack(Object msgId) {
System.out.println("ack invoked msgId = [" + msgId + "]");
}
/**
* 處理失敗被調用:
* @param msgId
*/
@Override
public void fail(Object msgId) {
System.out.println("fail invoked msgId = [" + msgId + "]");
/**
* 對於失敗的數據你要如何處理呢?
* 1、重新發送消息,下游要注意冪等性處理
* 2、持久化存儲錯誤的消息編號
* 3、報警
*/
}
- bolt修改
/**
* 模擬成功失敗場景:
*/
if(value % 2 == 0) {
this.collector.ack(input); // 確認消息處理成功
} else {
this.collector.fail(input); // 確認消息處理失敗
}
- topology修改
切換到本地模式:
LocalCluster localCluster = new LocalCluster();
localCluster.submitTopology("sum_topology", new Config(), topologyBuilder.createTopology());
3、在本地模式下運行我們的storm作業
右鍵直接運行即可……
4、分析結果
很明顯,符合我們的預期:奇數全部處理失敗,偶數全部處理成功。
閱讀更多 極客慧 的文章