圖解 Kubernetes Deployment Controller 工作原理與流程

熱烈歡迎你,相識是一種緣分,Echa 哥為了你的到來特意準備了一份驚喜,k8s學習資料《 》

Deployment 與控制器模式

在 K8s 中,pod 是最小的資源單位,而 pod 的副本管理是通過 ReplicaSet(RS) 實現的;而 deployment 實則是基於 RS 做了更上層的工作。


圖解 Kubernetes Deployment Controller 工作原理與流程

這就是 Kubernetes 的控制器模式,頂層資源通過控制下層資源,來拓展新能力。deployment 並沒有直接對 pod 進行管理,是通過管理 rs 來實現對 pod 的副本控制。deployment 通過對 rs 的控制實現了版本管理:每次發佈對應一個版本,每個版本有一個 rs,在註解中標識版本號,而 rs 再每次根據 pod template 和副本數運行相應的 pod。deployment 只需要保證任何情況下 rs 的狀態都在預期,rs 保證任何情況下 pod 的狀態都在預期。

K8s 是怎麼管理 Deployment 的

瞭解了 deployment 這一資源在 K8s 中的定位,我們再來看下這個資源如何達到預期狀態。

Kubernetes 的 API 和控制器都是基於水平觸發的,可以促進系統的自我修復和週期協調。

水平觸發這個概念來自硬件的中斷,中斷可以是水平觸發,也可以是邊緣觸發:

水平觸發 : 系統僅依賴於當前狀態。即使系統錯過了某個事件(可能因為故障掛掉了),當它恢復時,依然可以通過查看信號的當前狀態來做出正確的響應。 邊緣觸發 : 系統不僅依賴於當前狀態,還依賴於過去的狀態。如果系統錯過了某個事件(“邊緣”),則必須重新查看該事件才能恢復系統。

Kubernetes 水平觸發的 API 實現方式是:控制器監視資源對象的實際狀態,並與對象期望的狀態進行對比,然後調整實際狀態,使之與期望狀態相匹配。

水平觸發的 API 也叫聲明式 API,而監控 deployment 資源對象並確定符合預期的控制器就是 deployment controller,對應的 rs 的控制器就是 rs controller。

Deployment Controller

架構

首先看看 DeploymentController 在 K8s 中的定義:

<code>type DeploymentController struct {
\trsControl controller.RSControlInterface
\tclient clientset.Interface
\teventRecorder record.EventRecorder

\tsyncHandler func(dKey string) error
\tenqueueDeployment func(deployment *apps.Deployment)

\tdLister appslisters.DeploymentLister
\trsLister appslisters.ReplicaSetLister
\tpodLister corelisters.PodLister

\tdListerSynced cache.InformerSynced
\trsListerSynced cache.InformerSynced
\tpodListerSynced cache.InformerSynced

\tqueue workqueue.RateLimitingInterface
}
複製代碼/<code>

主要包括幾大塊內容:

  • rsControl 是一個 ReplicaSet Controller 的工具,用來對 rs 進行認領和棄養工作;
  • client 就是與 APIServer 通信的 client;
  • eventRecorder 用來記錄事件;
  • syncHandler 用來處理 deployment 的同步工作;
  • enqueueDeployment 是一個將 deployment 入 queue 的方法;
  • dLister、rsLister、podLister 分別用來從 shared informer store 中獲取 資源的方法;
  • dListerSynced、rsListerSynced、podListerSynced 分別是用來標識 shared informer store 中是否同步過;
  • queue 就是 workqueue,deployment、replicaSet、pod 發生變化時,都會將對應的 deployment 推入這個 queue,syncHandler() 方法統一從 workqueue 中處理 deployment。

工作流程

接下來看看 deployment controller 的工作流程。


圖解 Kubernetes Deployment Controller 工作原理與流程


deployment controller 利用了 informer 的工作能力,實現了資源的監聽,同時與其他 controller 協同工作。主要使用了三個 shared informer —— deployment informer、rs informer、pod informer。

首先 deployment controller 會向三個 shared informer 中註冊鉤子函數,三個鉤子函數會在相應事件到來時,將相關的 deployment 推進 workqueue 中。

deployment controller 啟動時會有一個 worker 來控制 syncHandler 函數,實時地將 workqueue 中的 item 推出,根據 item 來執行任務。主要包括:領養和棄養 rs、向 eventRecorder 分發事件、根據升級策略決定如何處理下級資源。

workqueue

首先看看 workqueue,這其實是用來輔助 informer 對事件進行分發的隊列。整個流程可以梳理為下圖。


圖解 Kubernetes Deployment Controller 工作原理與流程


可以看到,workqueue 分為了三個部分,一是一個先入先出的隊列,由切片來實現;還有兩個是名為 dirty 和 processing 的 map。整個工作流程分為三個動作,分別為 add、get、done。

add 是將消息推入隊列。消息時從 informer 過來的,其實過來的消息並不是事件全部,而是資源 key,即 namespace/name,以這種形式通知業務邏輯有資源的變更事件過來了,需要拿著這個 key 去 indexer 中獲取具體資源。如上圖綠色的過程所示,在消息進行之後,先檢查 dirty 中是否存在,若已存在,不做任何處理,表明事件已經在隊列中,不需要重複處理;若 dirty 中不存在,則將該 key 存入 dirty 中(將其作為 map 的鍵,值為空結構體),再推入隊列一份。

get 是 handle 函數從隊列中獲取 key 的過程。將 key 從隊列中 pop 出來的同時,會將其放入 processing 中,並刪除其在 dirty 的索引。這一步的原因是將 item 放入 processing 中標記其正在被處理,同時從 dirty 中刪除則不影響後面的事件入隊列。

done 則是 handle 在處理完 key 之後,必須執行的一步,相當於給 workqueue 發一個 ack,表明我已經處理完畢,該動作僅僅將其從 processing 中刪除。

有了這個小而美的先入先出的隊列,我們就可以避免資源的多個事件發生時,從 indexer 中重複獲取資源的事情發生了。下面來梳理 deployment controller 的具體流程。

replicaSet 的認領和棄養過程

在 deployment controller 的工作流程中,可以注意到除了 deployment 的三個鉤子函數,還有 rs 和 pod 的鉤子函數,在 rs 的三個鉤子函數中,涉及到了 deployment 對 rs 的領養和棄養過程。

rs 認親

首先來看 rs 的認親過程:


圖解 Kubernetes Deployment Controller 工作原理與流程


在 rs 的三個鉤子函數中,都會涉及到認親的過程。

當監聽到 rs 的變化後,會根據 rs 的 ownerReferences 字段找到對應的 deployment 入 queue;若該字段為空,意味著這是個孤兒 rs,啟動 rs 認親機制。

認親過程首先是遍歷所有的 deployment,判斷 deployment 的 selector 是否與當前 rs 的 labels 相匹配,找到所有與之匹配的 deployment。

然後判斷總共有多少個 deployment,若為 0 個,就直接返回,沒有人願意認領,什麼都不做,認領過程結束;若大於 1 個,拋錯出去,因為這是不正常的行為,不允許多個 deployment 同時擁有同一個 rs;若有且僅有一個 deployment 與之匹配,那麼就找到了願意領養的 deployment,將其入 queue。

addReplicaSet() 和 updateReplicaSet() 的認領過程類似,這裡只展示 addReplicaSet() 的代碼:

<code>func (dc *DeploymentController) addReplicaSet(obj interface{}) {
\trs := obj.(*apps.ReplicaSet)

\tif rs.DeletionTimestamp != nil {
\t\tdc.deleteReplicaSet(rs)
\t\treturn
\t}

\tif controllerRef := metav1.GetControllerOf(rs); controllerRef != nil {
\t\td := dc.resolveControllerRef(rs.Namespace, controllerRef)
\t\tif d == nil {
\t\t\treturn
\t\t}
\t\tklog.V(4).Infof("ReplicaSet %s added.", rs.Name)
\t\tdc.enqueueDeployment(d)
\t\treturn

\t}

\tds := dc.getDeploymentsForReplicaSet(rs)
\tif len(ds) == 0 {
\t\treturn
\t}
\tklog.V(4).Infof("Orphan ReplicaSet %s added.", rs.Name)
\tfor _, d := range ds {
\t\tdc.enqueueDeployment(d)
\t}
}
複製代碼/<code>

deployment 領養和棄養


圖解 Kubernetes Deployment Controller 工作原理與流程


deployment 對 rs 的領養和棄養過程,是發生在從 workqueue 中處理 item 的過程,也是找到當前 deployment 擁有的所有 rs 的過程。

該過程會輪詢所有的 rs,如果有 owner 且是當前 deployment,再判斷 label 是否滿足 deployment 的 selector,滿足則列入結果;若不滿足,啟動棄養機制,僅僅將 rs 的 ownerReferences 刪除,使其成為孤兒,不做其他事情。這也是為什麼修改了 deployment 的 selector 之後,會多一個 replicas!=0 的 rs 的原因。

如果 rs 沒有 owner,是個孤兒,判斷 label 是否滿足 deployment 的 selector,滿足條件,則啟動領養機制,將其 ownerReferences 設置為當前 deployment,再列入結果。

下面是整個過程的源碼:

<code>func (m *BaseControllerRefManager) ClaimObject(obj metav1.Object, match func(metav1.Object) bool, adopt, release func(metav1.Object) error) (bool, error) {
\tcontrollerRef := metav1.GetControllerOf(obj)
\tif controllerRef != nil {
\t\tif controllerRef.UID != m.Controller.GetUID() {
\t\t\treturn false, nil
\t\t}
\t\tif match(obj) {
\t\t\treturn true, nil
\t\t}

\t\tif m.Controller.GetDeletionTimestamp() != nil {
\t\t\treturn false, nil
\t\t}
\t\tif err := release(obj); err != nil {
\t\t\tif errors.IsNotFound(err) {
\t\t\t\treturn false, nil
\t\t\t}
\t\t\treturn false, err
\t\t}
\t\treturn false, nil
\t}

\tif m.Controller.GetDeletionTimestamp() != nil || !match(obj) {

\t\treturn false, nil
\t}
\tif obj.GetDeletionTimestamp() != nil {
\t\treturn false, nil
\t}

\tif err := adopt(obj); err != nil {
\t\tif errors.IsNotFound(err) {
\t\t\treturn false, nil
\t\t}
\t\treturn false, err
\t}
\treturn true, nil
}
複製代碼/<code>

領養和棄養的函數:

<code>func (m *ReplicaSetControllerRefManager) AdoptReplicaSet(rs *apps.ReplicaSet) error {
\tif err := m.CanAdopt(); err != nil {
\t\treturn fmt.Errorf("can't adopt ReplicaSet %v/%v (%v): %v", rs.Namespace, rs.Name, rs.UID, err)
\t}

\taddControllerPatch := fmt.Sprintf(
\t\t`{"metadata":{"ownerReferences":[{"apiVersion":"%s","kind":"%s","name":"%s","uid":"%s","controller":true,"blockOwnerDeletion":true}],"uid":"%s"}}`,
\t\tm.controllerKind.GroupVersion(), m.controllerKind.Kind,
\t\tm.Controller.GetName(), m.Controller.GetUID(), rs.UID)
\treturn m.rsControl.PatchReplicaSet(rs.Namespace, rs.Name, []byte(addControllerPatch))
}

func (m *ReplicaSetControllerRefManager) ReleaseReplicaSet(replicaSet *apps.ReplicaSet) error {
\tklog.V(2).Infof("patching ReplicaSet %s_%s to remove its controllerRef to %s/%s:%s",
\t\treplicaSet.Namespace, replicaSet.Name, m.controllerKind.GroupVersion(), m.controllerKind.Kind, m.Controller.GetName())
\tdeleteOwnerRefPatch := fmt.Sprintf(`{"metadata":{"ownerReferences":[{"$patch":"delete","uid":"%s"}],"uid":"%s"}}`, m.Controller.GetUID(), replicaSet.UID)
\terr := m.rsControl.PatchReplicaSet(replicaSet.Namespace, replicaSet.Name, []byte(deleteOwnerRefPatch))
\tif err != nil {
\t\tif errors.IsNotFound(err) {
\t\t\treturn nil
\t\t}
\t\tif errors.IsInvalid(err) {
\t\t\treturn nil
\t\t}
\t}
\treturn err
}
複製代碼/<code>

rolloutRecreate

如果 deployment 的更新策略是 Recreate,其過程是將舊的 pod 刪除,再啟動新的 pod。具體過程如下:


圖解 Kubernetes Deployment Controller 工作原理與流程


首先根據上一步 rs 的領養和棄養過程,獲得當前 deployment 的所有 rs,排序找出最新的 rs,將其 pod template 與 deployment 的 pod template 比較,若不一致需要創建新的 rs;

創建新的 rs 的過程為:計算當前 deployment 的 pod template 的 hash 值,將其增加至 rs label 及 selector 中;

對所有舊的 rs 計算出最大的 revision,將其加一,作為新 rs 的 revision,為新的 rs 設置如下註解:

<code>"deployment.kubernetes.io/revision"
"deployment.kubernetes.io/desired-replicas"
"deployment.kubernetes.io/max-replicas"
複製代碼/<code>

如果當前 deployment 的 revision 不是最新,將其設為最新;如果需要更新狀態,則更新其狀態;

將舊的 rs 進行降級,即將其副本數設為 0;

判斷當前所有舊的 pod 是否停止,判斷條件為 pod 狀態為 failed 或 succeed,unknown 或其他所有狀態都不是停止狀態;若並非所有 pod 都停止了,則退出本次操作,下一個循環再處理;

若所有 pod 都停止了,將新的 rs 進行升級,即將其副本數置為 deployment 的副本數;

最後進行清理工作,比如舊的 rs 數過多時,刪除多餘的 rs 等。

下面是 rolloutRecreate 的源代碼:

<code>func (dc *DeploymentController) rolloutRecreate(d *apps.Deployment, rsList []*apps.ReplicaSet, podMap map[types.UID][]*v1.Pod) error {
\tnewRS, oldRSs, err := dc.getAllReplicaSetsAndSyncRevision(d, rsList, false)
\tif err != nil {
\t\treturn err
\t}
\tallRSs := append(oldRSs, newRS)
\tactiveOldRSs := controller.FilterActiveReplicaSets(oldRSs)

\tscaledDown, err := dc.scaleDownOldReplicaSetsForRecreate(activeOldRSs, d)

\tif err != nil {
\t\treturn err
\t}
\tif scaledDown {
\t\treturn dc.syncRolloutStatus(allRSs, newRS, d)
\t}

\tif oldPodsRunning(newRS, oldRSs, podMap) {
\t\treturn dc.syncRolloutStatus(allRSs, newRS, d)
\t}

\tif newRS == nil {
\t\tnewRS, oldRSs, err = dc.getAllReplicaSetsAndSyncRevision(d, rsList, true)
\t\tif err != nil {
\t\t\treturn err
\t\t}
\t\tallRSs = append(oldRSs, newRS)
\t}

\tif _, err := dc.scaleUpNewReplicaSetForRecreate(newRS, d); err != nil {
\t\treturn err
\t}

\tif util.DeploymentComplete(d, &d.Status) {
\t\tif err := dc.cleanupDeployment(oldRSs, d); err != nil {
\t\t\treturn err
\t\t}
\t}

\treturn dc.syncRolloutStatus(allRSs, newRS, d)
}
複製代碼/<code>

rolloutRolling

如果 deployment 的更新策略是 Recreate,其過程是將舊的 pod 刪除,再啟動新的 pod。具體過程如下:


圖解 Kubernetes Deployment Controller 工作原理與流程


從上圖可以看到,從開始到創建新的 rs 的過程與 rolloutRecreate 過程一致,唯一區別在於,設置新 rs 副本數的過程。在 rolloutRolling 的過程中,新的 rs 的副本數為 deploy.replicas + maxSurge - currentPodCount。代碼如下:

<code>func NewRSNewReplicas(deployment *apps.Deployment, allRSs []*apps.ReplicaSet, newRS *apps.ReplicaSet) (int32, error) {
\tswitch deployment.Spec.Strategy.Type {
\tcase apps.RollingUpdateDeploymentStrategyType:
\t\t// Check if we can scale up.
\t\tmaxSurge, err := intstrutil.GetValueFromIntOrPercent(deployment.Spec.Strategy.RollingUpdate.MaxSurge, int(*(deployment.Spec.Replicas)), true)
\t\tif err != nil {
\t\t\treturn 0, err
\t\t}
\t\t// Find the total number of pods
\t\tcurrentPodCount := GetReplicaCountForReplicaSets(allRSs)
\t\tmaxTotalPods := *(deployment.Spec.Replicas) + int32(maxSurge)
\t\tif currentPodCount >= maxTotalPods {
\t\t\t// Cannot scale up.
\t\t\treturn *(newRS.Spec.Replicas), nil
\t\t}
\t\t// Scale up.
\t\tscaleUpCount := maxTotalPods - currentPodCount
\t\t// Do not exceed the number of desired replicas.
\t\tscaleUpCount = int32(integer.IntMin(int(scaleUpCount), int(*(deployment.Spec.Replicas)-*(newRS.Spec.Replicas))))
\t\treturn *(newRS.Spec.Replicas) + scaleUpCount, nil
\tcase apps.RecreateDeploymentStrategyType:
\t\treturn *(deployment.Spec.Replicas), nil
\tdefault:
\t\treturn 0, fmt.Errorf("deployment type %v isn't supported", deployment.Spec.Strategy.Type)
\t}
}
複製代碼/<code>

然後到了增減新舊 rs 副本數的過程。主要為先 scale up 新 rs,再 scale down 舊 rs。scale up 新 rs 的過程與上述一致;scale down 舊 rs 的過程為先計算一個最大 scale down 副本數,若小於 0 則不做任何操作;然後在 scale down 的時候做了一個優化,先 scale down 不正常的 rs,可以保證先刪除那些不健康的副本;最後如果還有餘額,再 scale down 正常的 rs。

每次 scale down 的副本數為 allAvailablePodCount - minAvailable,即 allAvailablePodCount - (deploy.replicas - maxUnavailable)。代碼如下:

<code>func (dc *DeploymentController) scaleDownOldReplicaSetsForRollingUpdate(allRSs []*apps.ReplicaSet, oldRSs []*apps.ReplicaSet, deployment *apps.Deployment) (int32, error) {
\tmaxUnavailable := deploymentutil.MaxUnavailable(*deployment)

\t// Check if we can scale down.
\tminAvailable := *(deployment.Spec.Replicas) - maxUnavailable
\t// Find the number of available pods.
\tavailablePodCount := deploymentutil.GetAvailableReplicaCountForReplicaSets(allRSs)
\tif availablePodCount <= minAvailable {
\t\t// Cannot scale down.
\t\treturn 0, nil
\t}
\tklog.V(4).Infof("Found %d available pods in deployment %s, scaling down old RSes", availablePodCount, deployment.Name)

\tsort.Sort(controller.ReplicaSetsByCreationTimestamp(oldRSs))

\ttotalScaledDown := int32(0)
\ttotalScaleDownCount := availablePodCount - minAvailable
\tfor _, targetRS := range oldRSs {
\t\tif totalScaledDown >= totalScaleDownCount {
\t\t\t// No further scaling required.
\t\t\tbreak
\t\t}
\t\tif *(targetRS.Spec.Replicas) == 0 {
\t\t\t// cannot scale down this ReplicaSet.
\t\t\tcontinue
\t\t}
\t\t// Scale down.
\t\tscaleDownCount := int32(integer.IntMin(int(*(targetRS.Spec.Replicas)), int(totalScaleDownCount-totalScaledDown)))
\t\tnewReplicasCount := *(targetRS.Spec.Replicas) - scaleDownCount
\t\tif newReplicasCount > *(targetRS.Spec.Replicas) {
\t\t\treturn 0, fmt.Errorf("when scaling down old RS, got invalid request to scale down %s/%s %d -> %d", targetRS.Namespace, targetRS.Name, *(targetRS.Spec.Replicas), newReplicasCount)
\t\t}
\t\t_, _, err := dc.scaleReplicaSetAndRecordEvent(targetRS, newReplicasCount, deployment)
\t\tif err != nil {
\t\t\treturn totalScaledDown, err
\t\t}

\t\ttotalScaledDown += scaleDownCount
\t}

\treturn totalScaledDown, nil
}
複製代碼/<code>

總結

因為 Kubernetes 採用聲明式 API,因此對於 Controller 來說,所做的事情就是根據當前事件計算一個預期的狀態,並判斷當前實際的狀態是否滿足預期的狀態,如果不滿足,則採取行動使其靠攏。

本文通過對 Deployment Controller 的工作流程進行分析,雖然做的事情比較繁瑣,但是其所做的事情都是圍繞 “向預期靠攏” 這個目標展開的。


分享到:


相關文章: