Java工程師必須掌握的Storm可靠性策略實戰演練

本文從兩個方面介紹了Storm是如何保證可靠性的:進程級別、消息級別。最後,我們會通過一個實例來說明,在真實業務場景中,我們應該怎麼做,才能保證Storm的可靠性。

一、Storm Daemon(守護進程)的可靠性與容錯性

Storm 有幾個不同的守護進程,如:調度 workers 的 Nimbus、啟動和殺死 workers 的 supervisors、可以訪問日誌的 log viewer(日誌查看器)以及顯示集群狀態 UI等. 我們看下面幾種場景:

  • 當一個 node(節點)掛掉時會發生什麼?

分配給該機器的 task(任務)將超時, Nimbus 將這些 task(任務)重新分配給其他機器.

  • 當一個 worker 掛掉時會發生什麼?

當一個 worker 掛掉時, supervisor 將會重啟它. 如果在重啟它時繼續發生故障並且沒有發送 hearbeat(心跳)給 Nimbus, 那麼 Nimbus 將會重新調度 worker,即分配到其它機器上進行啟動.

  • 當 Nimbus 或 Supervisor 掛掉時會發生什麼?
  • Nimbus 和 Supervisor 守護進程是為 fail-fast(快速失敗)(遇到任何意外情況時進程自毀)和無狀態(所有狀態都保存在 Zookeeper 或磁盤上)而設計的. 像 部署 Storm 集群 中描述的一樣, Nimbus 和 Supervisor 守護進程必須使用 daemontools 或 monit 工具進行監督. 所以如果 Nimbus 或 Supervisor 守護進程掛掉後, 它們會像什麼都沒發生一樣重啟.

最值得注意的是, 沒有 worker 進程受到 Nimbus 或 Supervisors 掛掉的影響. 這與 Hadoop 相反, 如果 JobTracker 掛掉, 所有正在運行的 job 作業都將丟失.

  • Nimbus 是單點故障的嗎?

不是。

如果你失去了 Nimbus 節點, workers 仍然會繼續工作. 此外,即使workers掛掉了, supervisors也會繼續重新啟動 workers. 但是,如果沒有 Nimbus,worker 在必要時不會重新分配給其他機器(如失去 worker 機器).

Storm Nimbus 自 1.0.0 以來是 highly available(高可用的),即:有備節點.

二、保證消息處理的可靠性(ack和fail機制)

1、原理介紹

storm的可靠性是由spout和bolt共同決定的,storm利用了ack和fail機制來保證消息處理的可靠性。

  • 如果spout發射的一個tuple被完全處理,那麼spout的ack方法即會被調用,如果失敗,則其fail方法便會被調用。

一個tuple及其子tuple共同構成了一個tupletree,當這個tree中所有tuple在指定時間內都完成時,spout的ack才會被調用,但是,當tree中任何一個tuple失敗時,spout的fail方法都會被調用。

  • 在bolt中,如果處理tuple成功,則調用bolt的ack方法,如果失敗,則調用其fail方法。

2、具體代碼實現

代碼下載地址:https://gitee.com/jikeh/JiKeHCN-RELEASE.git

項目名:spring-boot-storm

  • spout修改

nextTuple():

//tuple的唯一編號 msgId,可以根據你實際的業務場景來定義這個msgId
UUID msgId = UUID. randomUUID();
this.collector.emit(new Values(i++), msgId);

新增兩個方法ack()、fail():

/**
* 處理成功被調用:
* @param msgId
*/
@Override
public void ack(Object msgId) {
System.out.println("ack invoked msgId = [" + msgId + "]");
}

/**
* 處理失敗被調用:
* @param msgId
*/
@Override
public void fail(Object msgId) {
System.out.println("fail invoked msgId = [" + msgId + "]");

/**
* 對於失敗的數據你要如何處理呢?
* 1、重新發送消息,下游要注意冪等性處理
* 2、持久化存儲錯誤的消息編號
* 3、報警
*/

}
  • bolt修改
/**
* 模擬成功失敗場景:
*/
if(value % 2 == 0) {
this.collector.ack(input); // 確認消息處理成功
} else {
this.collector.fail(input); // 確認消息處理失敗
}
  • topology修改

切換到本地模式:

LocalCluster localCluster = new LocalCluster();
localCluster.submitTopology("sum_topology", new Config(), topologyBuilder.createTopology());

3、在本地模式下運行我們的storm作業

右鍵直接運行即可……

4、分析結果

Java工程師必須掌握的Storm可靠性策略實戰演練

很明顯,符合我們的預期:奇數全部處理失敗,偶數全部處理成功。


分享到:


相關文章: