[Python]利用ZooKeeper構建分佈式定時任務

本文涉及的源代碼路徑:https://github.com/xiaoquqi/openstackclient-demo/tree/master/tooz

一、目前現狀及存在的問題

在實際業務系統中,經常有需要定時執行的任務,例如任務狀態的定時更新、定時發送狀態信息等。在我們的雲遷移產品中,允許用戶可以設定週期同步規則,定期執行數據同步並調用雲平臺接口執行快照操作。在單機版本中,通常在同一時間點併發任務量較少的情況下,問題並不是很突出,但是隨著我們將雲遷移服務從單機版本改造為平臺版本後,當多個用戶的多臺主機同時觸發快照任務時,一方面傳統的設計方式就成為了瓶頸,無法保證用戶的同步任務在同一時間點被觸發(需要排隊);另外一方面,目前Active-Passive(簡稱AP方式)的高可靠部署方式無法利用集群橫向擴展能力,無法滿足高併發的要求。

[Python]利用ZooKeeper構建分佈式定時任務

軟件架構設計

目前雲遷移平臺的各個服務模塊在設計上使用了OpenStack方式,即大部分模塊複用了類似Nova的實現框架。即API層直接集成oslo.service中定義好的WSGI Service基類,Worker採用了olso.service中定義好的Service基類,即Eventlet協程方式,API與Worker通訊使用RabbitMQ,API南向接口除少量直接更新數據庫操作採用同步接口外,其餘所有接口全部使用異步方式。API發送請求後,得到202 Accepted回覆,後續通過GET接口不斷輪詢任務接口等到任務完成。

高可靠部署

根據OpenStack官方的HA部署文檔(https://docs.openstack.org/ha-guide/),將服務分為無狀態和有狀態兩種。無服務狀態只需要直接部署多份即可,有狀態服務往往需要通過Pacemaker控制副本數量,來保證高可靠。在雲遷移平臺部署中,我們將全部服務部署於K8S集群中,所以並不需要Pacemaker+Corosync這樣的組件(Pacemaker節點上線為16)。但是,由於需要保持定時任務在單一節點被觸發(避免任務被重複執行),所以承載定時快照的模塊只能同時存在一個容器在運行,無法構成Active-Active(簡稱AA方式)模式。這樣的部署方式,也造成了上述提到的AP模式對擴展性的瓶頸。

二、問題思路及解決方案

思路一、利用消息隊列解耦任務分配與任務執行

從上述對現狀的描述,我們不難看出,現有任務分配與任務執行是在同一個任務中執行的,當存在大量任務時,任務執行會對任務產生產生很大的影響。同時,由於任務執行唯一性的需要,在部署上只能採用上述的AP模式,導致任務無法由多個任務同時執行。

所以,我們可以將任務分解為分配和執行兩個階段。任務分配上,單純的進行任務生成,由於任務生成相對較快,生成後的任務發送至消息隊列,由無狀態性的Worker接收後執行。這樣就解決了單點執行的效率低下問題。

但是這樣的解決方案仍然存在缺陷,我們在任務生成的模塊仍然必須需要採用AP模式部署,來保證任務的唯一性。如果在任務數量非常龐大時,該部分仍然是一個瓶頸;另外一方面這樣的實現方式,我們需要將任務生成部分單獨拆分出一個模塊,同時增加了開發和部署上的複雜度,所以我們來看一下第二種解決思路。

[Python]利用ZooKeeper構建分佈式定時任務

思路二、利用Zookeeper構建可擴展的分佈式定時任務

為了解決思路一的侷限性,我們本質上要解決的是任務執行的分佈式問題,即如何讓Worker不重複的判定任務的歸屬後再執行,由被動改為主動。


我們來看以下幾種場景:

1、假定我們現在有3個Worker可以用於任務生成,在某一個時間點,將同時產生100個任務。如何由這3個Worker主動產生屬於自身負責的任務?

2、我們知道大部分雲平臺目前都有云原生的彈性擴展服務,如果我們結合雲平臺的彈性擴展服務自動將我們用於任務生成的Worker動態進行調整時,例如變為6個時,還能保證這100個任務能夠被自動的由6個節點不重複的產生呢?

3、當負載降低後,節點數量由6個變為3個後,如何恢復場景1的狀態呢?保證任務不漏生成呢?

[Python]利用ZooKeeper構建分佈式定時任務

如果想達到以上場景需求,需要以下幾個條件:

1、節點之間能夠準確知道其他節點的存在——利用Zookeeper進行服務發現

2、儘量合理的進行任務(對象)分佈,同時兼顧節點增加和減少時,降低對象分配時的位移——利用一致性哈希環

三、技術要點

1、Zookeeper

對於Zookeeper的解釋網絡上有各種各樣的詳細集成,這裡就不再贅述了,這裡我直接引用了這篇文章(https://www.jianshu.com/p/50becf121c66)中開頭的內容:

官方文檔上這麼解釋zookeeper,它是一個分佈式服務框架,是Apache Hadoop 的一個子項目,它主要是用來解決分佈式應用中經常遇到的一些數據管理問題,如:統一命名服務、狀態同步服務、集群管理、分佈式應用配置項的管理等。

上面的解釋有點抽象,簡單來說zookeeper=文件系統+監聽通知機制。

[Python]利用ZooKeeper構建分佈式定時任務

從我們應用場景的角度看,Zookeeper幫我們解決了Worker之間相互認識的過程,及時、準確的告訴我們:到底現在有多少個和我相同的活躍節點存在。至於底層是如何實現的,感興趣的同學可以查看具體的Zookeeper實現原理文檔,這裡只介紹與我們實現相關的內容。

2、一致性Hash

又是一個經典的算法,相關的文章也很多,這裡推薦大家幾篇,這裡摘抄出對理解我們實現有價值的內容。

參考文檔:

《面試必備:什麼是一致性Hash算法?》https://zhuanlan.zhihu.com/p/34985026

《五分鐘看懂一致性哈希算法》https://juejin.im/post/5ae1476ef265da0b8d419ef2

《一致性hash在分佈式系統中的應用》http://www.firefoxbug.com/index.php/archives/2791/


2.1 關於一致性哈希算法

一致性哈希算法在1997年由麻省理工學院的Karger等人在解決分佈式Cache中提出的,設計目標是為了解決因特網中的熱點(Hot spot)問題,初衷和CARP十分類似。一致性哈希修正了CARP使用的簡單哈希算法帶來的問題,使得DHT可以在P2P環境中真正得到應用。但現在一致性hash算法在分佈式系統中也得到了廣泛應用。

2.2 一致性哈希算法在緩存技術中的應用

[Python]利用ZooKeeper構建分佈式定時任務

上述的方式雖然提升了性能,我們不再需要對整個Redis服務器進行遍歷!但是,使用上述Hash算法進行緩存時,會出現一些缺陷,主要體現在服務器數量變動的時候,所有緩存的位置都要發生改變!

試想一下,如果4臺緩存服務器已經不能滿足我們的緩存需求,那麼我們應該怎麼做呢?很簡單,多增加幾臺緩存服務器不就行了!假設:我們增加了一臺緩存服務器,那麼緩存服務器的數量就由4臺變成了5臺。那麼原本hash(a.png) % 4 = 2 的公式就變成了hash(a.png) % 5 = ? , 可想而知這個結果肯定不是2的,這種情況帶來的結果就是當服務器數量變動時,所有緩存的位置都要發生改變!換句話說,當服務器數量發生改變時,所有緩存在一定時間內是失效的,當應用無法從緩存中獲取數據時,則會向後端數據庫請求數據(還記得上一篇的《緩存雪崩》嗎?)!

同樣的,假設4臺緩存中突然有一臺緩存服務器出現了故障,無法進行緩存,那麼我們則需要將故障機器移除,但是如果移除了一臺緩存服務器,那麼緩存服務器數量從4臺變為3臺,也是會出現上述的問題!

所以,我們應該想辦法不讓這種情況發生,但是由於上述Hash算法本身的緣故,使用取模法進行緩存時,這種情況是無法避免的,為了解決這些問題,Hash一致性算法(一致性Hash算法)誕生了!

2.3 一致性哈希在緩存中的應用

初始狀態,將節點映射到哈希環中

[Python]利用ZooKeeper構建分佈式定時任務


將對象映射到換後,找到負責處理的Node節點。

[Python]利用ZooKeeper構建分佈式定時任務


容錯性,Node C出現故障後,只需要將Object C遷移到Node D上。

[Python]利用ZooKeeper構建分佈式定時任務


增加節點,此時增加了Node X,在Node C右側,那麼此時只有Object C需要移動到Node X節點。

[Python]利用ZooKeeper構建分佈式定時任務

3、tooz和kazoo

Python中操作zookeeper的項目叫kazoo(https://kazoo.readthedocs.io/en/latest/)。

tooz是OpenStack中為簡化開發人員操作分佈式系統一致性所開發的組件,利用底層組件抽象出一致性組成員管理、分佈式鎖、選舉、構建哈希環等。tooz除支持zookeeper作為後端,還可以支持Memcached、Redis、IPC、File、PostgreSQL、MySQL、Etcd、Consul等。

有關於tooz的發展歷史可以參考:https://julien.danjou.info/python-distributed-membership-lock-with-tooz/

這裡我們主要使用tooz操作zookeeper實現我們的一致性組及一致性哈希。

4、oslo相關項目

這幾年一直在做OpenStack項目,從OpenStack項目中學習到很多設計、架構、研發管理等各種新知識、新理念。oslo項目就是在OpenStack不斷的迭代中產生的公共項目庫,這些庫可以讓你非常輕鬆的構建基於Python的構建近似於OpenStack的分佈式、可擴展的微服務系統。

之前在從事OpenStack開發培訓過程中,有專門的一節課去講解OpenStack中用到的公共庫,其中oslo相關項目就是非常重要的一部分內容。olso項目設計的庫非常多,在這個內容中會涉及到oslo.config、oslo.log、oslo.service、oslo.utils和oslo.messaging項目。嚴格意義上來說,為了更精準控制任務,我們還應該引入oslo.db項目由數據庫持久化的維護任務運行狀態,包括任務回收等工作,但是本次內容主要講解的是zookeeper,所以這部分的內容需要開發者在實際項目中去實現。

關於olso開發的內容,我會以視頻課程的形式為大家講解,敬請期待。

四、實現過程

1、Zookeeper部署

<code>docker-compose -f zookeeper.yml -d up/<code>


啟動完成後,將使用本地的三個容器作為zookeeper的三個節點和三個不同的端口(2181/2182/2183)便於zookeeper連接。如果在生產環境中部署時,可以使用雲原生服務或部署在多個可用區的方式,保證高可靠。

[Python]利用ZooKeeper構建分佈式定時任務

Zookeeper常用命令行

進入容器,就可以使用zkCli.sh進入zookeeper的CLI模式。如果是初次接觸zookeeper,可以把zookeeper理解成一個文件系統,這裡我們常用的命令就是ls。

<code>docker exec -it zookeeper_zoo1_1 bash
cd bin
zkCli.sh/<code>


看到這樣的提示,就表示連接成功了。

[Python]利用ZooKeeper構建分佈式定時任務

如上面提到的zookeeper的存儲結構所示,我們先從根節點(/)進行獲取。

<code>ls //<code>


此時返回

[Python]利用ZooKeeper構建分佈式定時任務


這裡zookeeper目錄屬於保留的目錄,我們來看一下tooz的內容。

<code>ls /tooz/<code>


此時返回

[Python]利用ZooKeeper構建分佈式定時任務


如果我們想繼續查看distribution_tasks的內容,可以繼續使用ls命令獲取。

<code>ls /tooz/distribution_tasks/<code>


通常我們會為每一個加入的節點取一個唯一的標識,當節點加入後我們使用ls命令就可以看到,如果離開了,則返回為空。

[Python]利用ZooKeeper構建分佈式定時任務


zookeeper常用的命令還包括get,stat等獲取value和更詳細的信息,還包含更新節點操作set和刪除節點rm。這裡面就不做一一介紹了,我們直接操作zookeeper主要是為了幫助大家更好的理解程序邏輯。

具體的命令行信息可以參考:https://www.tutorialspoint.com/zookeeper/zookeeper_cli.htm

2、tooz基本使用方法

關於tooz的兩個示例主要來自於這篇博客:https://dzone.com/articles/scaling-a-polling-python-application-with-tooz

原文中的例子是有些Bug的,這裡面進行重新進行了優化和整理,並且使用zookeeper替代etcd3驅動。

2.1 組成員(tooz/test_tooz/test_group_members.py)

在這個例子中,我們主要為大家演示tooz如何進行組成員的管理。結合我們自身的需求,這裡的成員就是每一個Worker。通過這個例子我們將觀察三種不同場景的變化:

1、初始狀態下,我們只能看到一個成員;

2、當啟動了一個新的進程時,第一個成員馬上會發現有第二個成員的加入;

3、同時,當我們用CTRL + C結束某一個進程時,另外一個活著的進程會立即發現組成員的變化。


時序圖

這裡為了更直觀表達,用時序圖來說明程序的運行邏輯。


[Python]利用ZooKeeper構建分佈式定時任務

執行效果

第一個成員

<code>python test_group_members.py client1 group1/<code>
[Python]利用ZooKeeper構建分佈式定時任務


第二個成員加入,觀察第一個成員的標準輸出,為了觀察加入集群的時間,我們加入了date

<code>date && python test_group_members.py client2 group1/<code>
[Python]利用ZooKeeper構建分佈式定時任務

第一個腳本的標準輸出,在16:07:27秒的時候加入了集群:

[Python]利用ZooKeeper構建分佈式定時任務


將第二個成員關閉,直接在第二個成員腳本按CTRL + C,首先觀察第二個成員的輸出:

[Python]利用ZooKeeper構建分佈式定時任務

第一個成員的輸出,在16:08:51分時,集群中已經沒有了第二個成員了:

[Python]利用ZooKeeper構建分佈式定時任務

2.2 一致性哈希(tooz/test_tooz/test_ping.py)

這個模擬測試中,使用分佈式任務去ping某一個C類網段(255個IP地址)中的全部IP地址,如果由一個任務去完成,那麼只能順序執行,無法滿足併發需求,這裡採用一致性哈希算法,讓任務分佈在各個Worker上。為了節省時間,我們將原有程序中的實際ping換成了time.sleep等待方式。

另外在程序啟動後,我們默認等待10秒等待其他成員(member)加入,在實際開發過程中,還需要對任務的狀態進行嚴格控制,防止同一任務重複被執行,在演示代碼中主要偏重演示分佈式,所以並沒有在任務狀態上增加過多處理。

時序圖


[Python]利用ZooKeeper構建分佈式定時任務

代碼需要說明的幾點:

0、在程序開始時,我們默認等待了10秒,等待其他節點加入,如果在循環開始後,再有新加入的節點時,由於並不知道第一個節點已經處理過的任務,所以在第二個Worker加入後根據當時哈希環對之前的任務重新分配並執行,造成了重複執行,這個問題需要通過額外的手段(例如數據庫記錄先前執行的任務狀態)監控任務狀態來防止任務重新執行。

1、代碼中使用了tooz內置的Hash環,但是也可以在外部自己構建哈希環,我們在後續最終的例子中還是採用了外部構建哈希環的方法。

2、Tooz partitioner依賴於watchers,所以在每次循環的時候必須要調用run_watchers即使獲取成員的加入和離開。

3、無論是group還是member在變量傳遞時都要變成bytes類型,這樣可以確保對象的唯一性,所以在代碼處理上都用到了encode()方法。

4、__tooz_hash__方法需要在使用Partition時自己實現,能夠唯一標識出對象的方法,例如ID、名稱等信息。

執行效果

我們分別使用兩個不同的窗口,同時啟動兩個Worker,我們可以很明顯的看到主機被分配到兩個不同的Worker中。

<code>python test_ping.py client1 group1
python test_pring.py client2 group1/<code>
[Python]利用ZooKeeper構建分佈式定時任務


加入第三個Worker,可以看到一部分任務又被分配給了第三個Worker上

<code>python test_ping.py client3 group2/<code>
[Python]利用ZooKeeper構建分佈式定時任務


暫停第二個Worker,我們看到第二個Worker被停止後,任務重新被平衡到Worker1和Worker2上。

[Python]利用ZooKeeper構建分佈式定時任務

3、構建分佈式定時任務

為了保持代碼的兼容性,所以這裡的實現是基於目前OpenStack體系的實現。另外,將任務發送給消息的部分在這個例子中並沒有體現。示例代碼仍然重複實現上述ping的例子,部分代碼參考於Sahara項目的實現。


由於代碼量較大,這裡不貼出全部代碼,僅僅對核心實現進行分析,完整代碼請參考:https://github.com/xiaoquqi/openstackclient-demo/tree/master/tooz/distribute_periodic_tasks

代碼結構

<code>.
├── coordinator.py -> 一致性哈希的實現,該類中並沒有直接使用上述tooz的partition,而是自己重新實現了HashRing
├── periodic.py -> 定時任務,基於oslo_service的PeriodicTasks基類
├── service.py -> Service類,繼承於oslo.service的Service基類
└── test_periodic_task.py -> 程序入口/<code>


coordinator.py

Coordinator是關鍵實現,所以這裡重點對該類進行解釋,在period task中需要調用coordinator即可實現分佈式觸發定時任務。


在coordinator.py中共實現了兩個類,Coordinator和HashRing。

1、Coordinator類主要是針對tooz中對group members相關操作的封裝,類似我們在tooz中的第一個例子;

2、HashRing是繼承於Coordinator類,在功能上接近於tooz中Hash和Partition的實現,但是更簡潔,tooz構建HashRing的用的PartitionNumber是32(2^5),而我們用的是40,更大的數字會帶來更均勻的分佈但是會導致構建成本增加

3、HashRing中最重要的方法就是get_subset,通過映射到HashRing上的ID來判斷Object的歸屬Worker


[Python]利用ZooKeeper構建分佈式定時任務

運行效果

分別在兩個Terminal中運行腳本,可以看到Host被均勻的分佈在兩個Worker中執行。

<code>python test_periodic_task.py/<code>
[Python]利用ZooKeeper構建分佈式定時任務

五、總結

通過以上實例,我們瞭解瞭如何通過zookeeper構建分佈式系統並進行任務調度,當然zookeeper在分佈式系統還有更多的應用場景值得我們去學習。另外,OpenStack中很多抽象出來的模塊對快速構建Python分佈式系統是非常有幫助的,值得我們學習。


分享到:


相關文章: