使用Apache Kafka和Cassandra進行大規模可擴展的異常檢測

在Anomalia Machina系列的第十篇也是最後一篇博客中,我們調試了異常檢測系統,併成功地將應用程序從3個擴展到了48個Cassandra節點,並獲得了令人印象深刻的數量:574個CPU內核(橫跨Cassandra,Kafka和Kubernetes集群) ,每秒230萬次寫入Kafka(峰值),每秒220,000次異常檢查(可持續),每天的異常檢查量為190億次。

1.擴展之旅

奧德修斯的最後挑戰是重新獲得王位! 奧德修斯終於到達了他的伊薩卡(Ithaca)家園,結果發現他的宮殿已被108名求婚者淹沒,這些求婚者正在喝酒,宰殺他的牲畜並向他的妻子佩內洛普求婚。 如果他不加思索地衝進去,那可能會以慘敗告終。 相反,他偽裝成乞g,計劃奪回王位。 佩內洛普(Penelope)向求婚者宣佈,她將嫁給那個可以拉奧德修斯弓的男人,並通過連續十二個軸射出一支箭。 求婚者都嘗試了失敗。 當"乞g"嘗試時,他完成了這一壯舉! 奧德修斯擺脫了自己的偽裝,進行了戰鬥並消滅了所有的求婚者。

同樣,與其嘗試直接跳到故事的結尾並大規模運行Anomalia Machina應用程序,不如將其擴展到越來越大的集群上是明智的。 目的是(a)在投入大量資源之前發現如何擴展它,以及(b)記錄其如何隨著群集大小的增加進行擴展。

使用Apache Kafka和Cassandra進行大規模可擴展的異常檢測

奧德修斯消除了佩內洛普的求婚者

1.1準備

在最初的可伸縮性測試中,我簡化了方法,以專注於在Kubernetes集群上進行Cassandra可伸縮性和應用程序調整。 為此,我使用了一個小型生產型Kafka集群(每個節點3個節點,每個節點4個核)來"重放"我以前發送給它的相同事件。 事件重新處理是Kafka的豐富用例,我們在博客探索Apache Kafka"城堡" B部分:事件重新處理中進行了探討。 與生產者相比,Kafka消費者對Kafka集群的負載也非常低,因此這確保了Kafka不會成為瓶頸,並且在我擴展系統的其餘部分時,結果是可重複的。

為了準備好Anomalia Machina應用程序的擴展規模,我在上一個博客中做了一些改進。 鑑於用Prometheus監視的Pod數量可能會增加,以前在筆記本電腦上運行Prometheus服務器並手動將每個Pod IP地址添加到配置文件的方法不再可行。 為了解決這個問題,我將Prometheus部署到Kubernetes集群,並使用Prometheus Operator進行自動化Pod監視。

使用Apache Kafka和Cassandra進行大規模可擴展的異常檢測

Prometheus Kubernetes運算符

第一步是安裝Prometheus Operator並使其運行。 我是通過複製yaml文件來完成此操作的:

coreos / prometheus-operator

Prometheus Operator在Kubernetes上創建/配置/管理Prometheus集群-coreos / prometheus-operator

到我的本地計算機,並運行:

kubectl應用-f bundle.yaml

Prometheus運算符通過動態檢測和監視帶有與選擇器匹配的標籤的Pod來工作。 需要一些組裝。

· 服務對象監視器Pod(帶有與選擇器匹配的標籤)

· ServiceMonitor發現服務對象

· Prometheus對象指定應包括哪些ServiceMonitors

· 要訪問Prometheus實例,必須將其暴露給外界(例如,通過使用NodePort類型的服務)

· 第一次不起作用時,您很可能需要為Prometheus和Prometheus Operator創建基於角色的訪問控制規則

有關所有步驟和示例,請參閱此文檔。

但是,現在Prometheus在Kubernetes集群中運行,要監視獨立的AWS EC2實例上運行的Kafka負載生成器(Kafka生產者應用程序)變得不容易。 我還想確保為Kafka生產者提供足夠的資源,因此一個顯而易見的解決方案是將其部署到Kubernetes集群中。 事實證明這很容易,只需對現有Kubernetes部署工件進行簡單的複製/編輯即可從生產者jar文件中創建新的Deployment類型。 現在可以通過擴展Pod的數量輕鬆地增加Kafka生產者的負載。 這也使Prometheus操作員可以同時監視生產者應用程序和檢測器管道應用程序。 現在,我們準備繼續進行擴展之旅。

1.2預調整

使用Apache Kafka和Cassandra進行大規模可擴展的異常檢測

預先調校:" La Jamais Contente",1899年第一輛達到100 km / h的汽車(電動,68hp)

有幾個旋鈕可用來調整應用程序的異常檢測器管道部分。 每個Pod都有2個Kafka使用者(在線程池中)從Kafka集群讀取事件,而另一個線程池則執行Cassandra寫入,讀取並運行異常檢測算法。 每個Pod最初會啟動一個Cassandra連接,但如果連接過載(沒有),則可以由Cassandra Java驅動程序自動增加。

因此,有2個參數對於按比例增加應用程序進行調整至關重要:(1)Kafka使用者數量,以及(2)每個Pod的檢測器線程數量。 Kafka主題的分區數必須大於或等於Kafka使用者的數目,以確保每個使用者都可以接收數據(超出分區數的任何使用者都將處於空閒狀態)。 每個Pod的檢測器線程數量至關重要,因為吞吐量以"不可思議"的數量達到峰值,如果線程數量更多或更少,則數量下降。

我最初認為我可以通過相對簡單的過程進行橫向擴展:(1)在3節點群集的低負載(1 Pod)下調整檢測器線程池,然後(2)在每個較大的群集中增加Pods的數量 直到獲得最大吞吐量。 我使用這種方法來增加群集大小,將節點數量從3個增加到6個,12個和24個。 令人驚訝的是,這給出了該圖所示的亞線性可伸縮性。

使用Apache Kafka和Cassandra進行大規模可擴展的異常檢測

考慮到Cassandra具有更多節點的理論上理想的線性可伸縮性,這種應用程序調整方法顯然存在問題。

1.3調整後

為了更清楚地瞭解實際情況,我在Prometheus中顯示了更多指標,包括Kafka使用者的吞吐量,檢測器線程池以及不同的檢測器事件,包括檢測器運行或未運行(> = 50行,從Cassandra或 <50行),以及從Cassandra讀取的數據行數。 這樣就可以確認業務指標正確(僅當每個ID> = 50行時才報告),並檢查/調整線程池,以使使用者/檢測者的速率保持穩定狀態(一個不領先另一個)。 ,因為這可能不是最佳選擇。

然後,我重新運行該基準,並使用額外指標提供的增加的可見性,改進了針對較小集群大小迭代的調整方法(一次僅添加3個節點,而不是如上所述的兩倍)。 使用Instaclustr管理的Cassandra集群,可以輕鬆地向現有的Cassandra集群添加額外的節點,因為控制檯上有一個按鈕可以自動添加額外的節點。 這也加速了基準測試過程,因為我不必在每次運行開始時就將數據重新加載到Cassandra中(就像我每次對一個新集群所做的那樣),因為它已經就位。

請注意,在這些實驗中,我使Kafka集群的大小保持相同(因為來自Kafka使用者的負載極小,CPU小於10%),但是我確實通過增加每次CPU的Kubernetes工作者節點的數量來增加Kubernetes集群的大小。 利用率超過50%,以確保應用程序資源不會成為瓶頸。

下圖顯示了從3到21個節點的集群的調優結果,線性外推到50個節點。 50個節點的預期Pod數量約為100。

使用Apache Kafka和Cassandra進行大規模可擴展的異常檢測

類似的圖形,但是這次顯示了一個50節點集群的Kafka消費者(2 x Pods)的預測數量約為200。

使用Apache Kafka和Cassandra進行大規模可擴展的異常檢測

該圖顯示了隨著增加的Cassandra節點調整每個吊艙的檢測器線程數的結果,對於50個節點的Cassandra群集,線性外推法預測每個Pod超過250個線程。

使用Apache Kafka和Cassandra進行大規模可擴展的異常檢測

請注意,在具有相同配置的運行中,結果的差異最多為+/- 10%,並且對於某些中型群集(即,豆莢略多,每個豆莢的線程不足),調整可能不是100%最佳的 。 但是,很可能對7個數據點進行外推可得出較大聚類的合理預測。

所有調優都有幫助嗎? 是的,從3節點到21節點的Cassandra群集,與第一次嘗試相比,我們現在具有明顯更好的可伸縮性,現在接近完美的線性可伸縮性(誤差變化在+/- 10%之內),如圖所示。

使用Apache Kafka和Cassandra進行大規模可擴展的異常檢測

觀察:Apache Cassandra具有完美的線性可擴展性(因為它是一種無共享架構,沒有共享資源會成為增加節點的瓶頸),但是您需要付出一些努力來優化應用程序。 Cassandra將處理大量連接,但是為了獲得良好的可伸縮性,請嘗試通過優化每個連接的使用來最大程度地減少Cassandra連接的總數。

使用Apache Kafka和Cassandra進行大規模可擴展的異常檢測

調整後:快進120年……" Pininfarina Battista"是世界上最快的汽車,在2秒內達到0–100 kph,最高速度350 kph(電動,1,900hp)。

2.大規模結果

· 230萬次寫入Kafka(峰值)

· 每秒220,000次異常檢查(可持續)

· 30分鐘內檢查了4億個事件是否存在異常

· 每天190億次異常檢查

· 640萬事件/總的"系統"吞吐量(峰值)

· 橫跨Cassandra,Kafka和Kubernetes的574個CPU內核

對於最後一次運行,我們重新研究了原始的" Kafka作為緩衝區"用例(以將事件生產者與消費者分離)。 我們希望Kafka集群能夠在幾分鐘內處理至少200萬次寫入/秒,以應對負載高峰,同時使其餘異常檢測管道能夠擴展並以最大容量運行,以儘可能快地處理事件積壓。 。

基於將應用程序最多調整到21個節點的Cassandra集群的經驗,我們希望有足夠的經驗來應對最終的挑戰並擴展到更大的節點(一個48節點的集群)。

2.1 Kafka群集大小調整

根據以上預測,我們似乎需要在Kafka端分配200個分區。 因此,我啟動了一些不同大小的Kafka集群,並嘗試了提高生產者吞吐量和分區數量。

使用6個節點(每個節點4個CPU內核)Kafka集群作為起點,很明顯,寫入吞吐量會隨著分區的增加而顯著下降(這實際上是由於要寫入的分區數量而不是僅僅存在的分區數量) ,因為僅寫入600個分區中的6個分區會產生與只有6個分區相同的吞吐量。

使用Apache Kafka和Cassandra進行大規模可擴展的異常檢測

使用更大的Kafka節點大小(每個節點8個核心)可以使我們進入200個分區的目標範圍(> = 2M寫/秒)(右側橙色條形),但是群集已接近極限,因此我們決定使用 9個節點(每個節點8個CPU內核)的Kafka集群,因為我們不希望Kafka集群成為瓶頸。

使用Apache Kafka和Cassandra進行大規模可擴展的異常檢測

初步測試顯示,對於具有200個分區的9x8 Kafka集群,9個Kafka Producer Pod足以超過2M / s的寫入/秒目標。

2.2 48節點Cassandra群集

為了獲得最終結果,我們使用Instaclustr控制檯在AWS上啟動了一個48節點的Cassandra集群。 然後,我們調整了應用程序線程池(在第2.4節中的圖表中的線程池2),並在使用Instaclustr控制檯監視Prometheus中的應用程序度量以及Kafka和Cassandra群集度量的同時增加了Pods的數量。 我們以100個Pod,每個Pod具有300個檢測器線程(略高於預期,總共提供30,000個檢測器管道應用程序線程)達到了最大異常檢查/秒,並且Cassandra集群在97%的CPU上運行接近平穩( 高於針對生產集群的建議值),而Kafka在CPU上有66%的淨空。

為了測試Kafka作為緩衝區用例,我們從重放現有的Kafka事件切換到讀取新事件,在2分鐘內增加了Kafka生產者的負載,並在終止前將負載保持最大的2分鐘。

經過幾次錯誤的嘗試之後,我發現使用開源Kubernetes工具Weaveworks Scope很有用,以確保一切都按預期進行。 連接到Kubernetes集群很容易,並且支持不同的視圖和節點過濾。 此視圖顯示了主要服務(其中一些我以前配置時遇到的問題),並顯示了Prometheus的正確部署,並通過Prometheus操作員監視了100個Consumer Pod和9個Producer Pod。

使用Apache Kafka和Cassandra進行大規模可擴展的異常檢測

這是最終系統的規格。

群集詳細信息(全部在美國東部北弗吉尼亞州的AWS中運行)

Instaclustr託管的Kafka — EBS:高吞吐量1500 9 x r4.2xlarge-1500(1,500 GB磁盤,61 GB RAM,8核),Apache Kafka 2.1.0,複製因子= 3

Instaclustr管理的Cassandra —特大,48 x i3.2xlarge(1769 GB(SSD),61 GB RAM,8核),Apache Cassandra 3.11.3,複製係數= 3

AWS EKS Kubernetes工作節點-2 x c5.18xlarge(72核,144 GB RAM,25 Gbps網絡),Kubernetes版本1.10,平臺版本eks.3

2.3普羅米修斯和格拉法納州的原始結果

這是原始結果。 如該Prometheus圖所示,此測試的檢測器管道的平均等待時間(從從Kafka讀取事件,直到確定是否為異常)小於460ms。

使用Apache Kafka和Cassandra進行大規模可擴展的異常檢測

下圖顯示了Kafka生產者的增長速度(從1到9個Kubernetes Pods),加載時間為2分鐘,峰值為2.3M事件/秒(這次是Grafana)。 請注意,因為要從多個Pod中檢索每個指標,所以我必須將它們視為堆疊的圖來獲取所有Pod的總指標值。

使用Apache Kafka和Cassandra進行大規模可擴展的異常檢測

此圖顯示異常檢查率達到220,000個事件/秒並持續(直到處理了所有事件)。 Prometheus正在從100個Kubernetes Pods中收集此指標。

使用Apache Kafka和Cassandra進行大規模可擴展的異常檢測

2.4這是一個好結果嗎?

十年前,對於異常檢測系統來說,"提出一系列完整的交易是不切實際的"。 相反,他們建議使用匯總的歷史數據。 但是,我們已經證明,當前的技術不僅可以檢測原始交易中的異常,還可以依靠彙總數據來完成任務。

我們的結果與最近的結果相比如何? 2018年發佈的結果(針對類似的系統)使用240個內核實現了200次異常檢查/秒。 他們使用有監督的異常檢測,這需要對分類器進行訓練(每天一次),因此他們使用了Apache Spark(用於ML,功能工程和分類)以及Kafka和Cassandra。 考慮到資源差異,我們的結果是吞吐量提高了大約500倍,並且實時延遲更快。 由於"功能工程"階段,它們有更多的開銷,並且使用Spark運行分類器的延遲高達200s,使其不適合實時使用。 檢測延遲不到1秒(平均500毫秒),我們的解決方案足夠快,可以提供實時異常檢測和阻止。 如果傳入負載在短時間內超過管道的容量,則處理時間會增加,並且檢測到的潛在異常事務可能需要以不同的方式進行處理。

2.5分析

總而言之,Kafka的最大寫入/秒達到了2.3M / s,而其餘的管道則進行了可持續的220,000次異常檢查/秒。

使用Apache Kafka和Cassandra進行大規模可擴展的異常檢測

如果我們考慮整個系統中的所有事件(即,分佈式系統之間流動的所有事件),則實際數字大於此數字。 在上一個博客中,我們表明,對於每個異常檢查決策,還有許多其他事件對此做出了貢獻。 對於負載峰值情況,我們需要考慮較大的Kafka負載峰值(Y,藍線,2.3M / s)和較小的檢測器管線速率(Z,橙色線,220,000 / s):

使用Apache Kafka和Cassandra進行大規模可擴展的異常檢測

總的系統吞吐量峰值計算與之前的穩態計算略有不同,因為負載峰值(Y)由Kafka負載生成器產生(步驟1),並寫入Kafka集群(步驟2),直到管道的其餘部分 趕上並處理它們(步驟3-10,以最大速率Z)。

使用Apache Kafka和Cassandra進行大規模可擴展的異常檢測

因此,峰值系統吞吐量為(2 x峰值Kafka負載)+(8 x異常檢查/秒)=(2 x 2.3M)+(8 x 0.22)= 640萬事件/ s,如下圖所示:

使用Apache Kafka和Cassandra進行大規模可擴展的異常檢測

這是一些更大的數字。 進入Kafka的4分鐘事件(啟動和加載)產生了4億個事件要檢查,異常檢測流程需要30分鐘來處理所有事件。 下圖顯示了這種情況(請注意,速率以百萬每分鐘為單位):

使用Apache Kafka和Cassandra進行大規模可擴展的異常檢測

更為現實的情況是,假設在負載峰值之上,平均背景負載為連續運行的管道容量的50%(110,000個事件/秒)。 然後,由於負載峰值,需要60分鐘清除所有事件,如下圖所示:

使用Apache Kafka和Cassandra進行大規模可擴展的異常檢測

在什麼情況下有用? 想象一下,我們有一個SLA,可以在不到1秒鐘的時間內處理每週99%的事件,延遲上限為1小時。 假設這樣的負載高峰事件相對不頻繁(例如每週一次),則這些方案可以滿足99.4%的SLA(1小時為一週的0.6%)。

對於我們的最終編號和最大編號,下圖顯示了我們最大的具有48個Cassandra節點的Anomalia Machine系統具有足夠的能力來處理每天190億次異常檢查。

使用Apache Kafka和Cassandra進行大規模可擴展的異常檢測

2.6 Anomalia Machina有多大?

我們最後的Anomalia Machina"機器"有多大? 這是一張圖表,顯示了業務指標與Cassandra,Kafka和Kubernetes集群使用的核心數量以及整個系統的關係。

使用Apache Kafka和Cassandra進行大規模可擴展的異常檢測

取得最大結果的完整機器(48個Cassandra節點)共有574個核心。 這是很多核心! 手動管理這種規模的系統的配置和監視將是一項巨大的工作。 通過將Instaclustr管理的Cassandra和Kafka集群(自動配置和監視)以及Kubernetes(AWS EKS)管理的集群相結合,可以輕鬆按需啟動集群,運行應用幾個小時,以及 完成後刪除資源,以節省大量成本。 使用Prometheus Kubernetes運算符監視100多個運行該應用程序的Pod可以順利進行,並增強了對應用程序的可見性,並有必要訪問基準指標以調整和報告結果。

該系統(與大小無關)每秒每核心發送約400次恆定的異常檢查。

值得注意的是,儘管Kafka集群正在處理的負載峰值(2.3M / s)比Cassandra集群(220,000 / s)大一個數量級,但Cassandra集群的大小是Kafka集群的5倍以上。使用" Kafka作為緩衝區"來應對負載峰值,而不是將Cassandra群集的大小增加一個數量級(即從48個節點增加到480個節點),顯然更有效(更容易,更便宜,更有彈性)。匆忙。但是,如果有足夠的警告,則可以動態調整Cassandra群集的大小。 Instaclustr對Apache Cassandra的動態調整大小可以在數分鐘之內進行垂直擴展(在整個集群中為20–30分鐘,但容量幾乎立即開始增加)。容量最大的增長是從r4.large(2核)到r.4xlarge(16核),容量增加了8倍。如果與Kafka一起使用並與之配合使用,則對於此場景就足夠了,並且將大大加快事件積壓的處理速度。我在一個較小的群集上嘗試了一下,一次調整一個節點的大小(同時調整大小也是一種選擇),並且它可以完美地工作。為此,您需要(1)創建可調整大小的Instaclustr Cassandra集群,(2)具有足夠的節點以啟用垂直縮放以滿足目標負載,以及(3)在Kubernetes上啟用應用程序的彈性縮放(這是另一個挑戰)。

2.7規模承受能力

我們已經證明,我們的系統可以很好地擴展,以每天處理190億個事件,甚至對於一個大型企業來說也綽綽有餘。 那麼,運行這種規模的異常檢測系統的運營成本是多少? 該圖顯示,使用按需AWS實例的基本基礎架構每天僅花費約1,000美元。

使用Apache Kafka和Cassandra進行大規模可擴展的異常檢測

該圖還顯示,可以輕鬆地按比例放大或縮小系統以適應不同的業務需求,並且基礎架構成本將按比例縮放。 例如,我們運行的最小系統仍然每天檢查15億個事件,而AWS基礎設施每天的成本僅為100美元。

可以肯定的是,總擁有成本會更高(包括異常檢測應用程序的研發,應用程序的日常維護,託管服務成本等)。 假設每天的實際總成本為10,000美元(基礎設施成本的10倍),則該系統可以對每花費190萬美元的事件進行異常檢查。


結語

正如荷馬史詩般的演繹,是無限的想象力(例如英雄,神靈,像獨眼的食人獨眼巨人波利夫摩斯之類的怪物,誘人的警報器,六頭海蛇Scylla,若蟲的Calypso), 可擴展的異常檢測器系統僅受您的想象力限制! 在本系列中,我們證明了開源技術(Kafka,Cassandra,Kubernetes,Prometheus)與Instaclustr管理的Kafka和Cassandra集群的組合可以擴展以檢測每天隱藏在數十億個事件中的異常現象,從而提供了顯著的收益/成本比,並且 實際節省的資金,並且適用於許多應用領域。 您是否有荷馬尺度的想象力?

後記

最近,我有機會接觸到" Kubernetes"(舵手)相關技術。 蘇瓦(Suva)的斐濟博物館(Fijian Museum)擁有最後一個斐濟洋雙殼獨木舟(Drua-Ratu Finau,14m船身)。 這是一張帶有操縱桿(uli)的照片,它的長度超過3m,但可以由一個舵手管理。

使用Apache Kafka和Cassandra進行大規模可擴展的異常檢測

較大的獨木舟(3600萬個船體)的操縱槳更大(只要此獨木舟),並且需要多達4位舵手來操縱它們(在繩索的幫助下)並使它們保持在正常軌道上。

使用Apache Kafka和Cassandra進行大規模可擴展的異常檢測

要了解有關我們的開源技術託管平臺的更多信息,請聯繫我們或註冊免費試用版。

(本文翻譯自Instaclustr的文章《Anomalia Machina 10 — Final Results: Massively Scalable Anomaly Detection with Apache Kafka and Cassandra》,參考:https://medium.com/swlh/anomalia-machina-10-final-results-massively-scalable-anomaly-detection-with-apache-kafka-and-59c11c249fa3)


分享到:


相關文章: