熱烈歡迎你,相識是一種緣分,Echa 哥為了你的到來特意準備了一份驚喜,k8s學習資料《 》
在 Kubernetes Master 節點中,有三個重要組件:ApiServer、ControllerManager、Scheduler,它們一起承擔了整個集群的管理工作。本文嘗試梳理清楚 ControllerManager 的工作流程和原理。
什麼是 Controller Manager
根據官方文檔的說法:kube-controller-manager 運行控制器,它們是處理集群中常規任務的後臺線程。
說白了,Controller Manager 就是集群內部的管理控制中心,由負責不同資源的多個 Controller 構成,共同負責集群內的 Node、Pod 等所有資源的管理,比如當通過 Deployment 創建的某個 Pod 發生異常退出時,RS Controller 便會接受並處理該退出事件,並創建新的 Pod 來維持預期副本數。
幾乎每種特定資源都有特定的 Controller 維護管理以保持預期狀態,而 Controller Manager 的職責便是把所有的 Controller 聚合起來:
- 提供基礎設施降低 Controller 的實現複雜度
- 啟動和維持 Controller 的正常運行
可以這麼說,Controller 保證集群內的資源保持預期狀態,而 Controller Manager 保證了 Controller 保持在預期狀態。
Controller 工作流程
在講解 Controller Manager 怎麼為 Controller 提供基礎設施和運行環境之前,我們先了解一下 Controller 的工作流程是什麼樣子的。
從比較高維度的視角看,Controller Manager 主要提供了一個分發事件的能力,而不同的 Controller 只需要註冊對應的 Handler 來等待接收和處理事件。
以 Deployment Controller 舉例,在 pkg/controller/deployment/deployment_controller.go 的 NewDeploymentController 方法中,便包括了 Event Handler 的註冊,對於 Deployment Controller 來說,只需要根據不同的事件實現不同的處理邏輯,便可以實現對相應資源的管理。
<code>dInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
\tAddFunc: dc.addDeployment,
\tUpdateFunc: dc.updateDeployment,
\t// This will enter the sync loop and no-op, because the deployment has been deleted from the store.
\tDeleteFunc: dc.deleteDeployment,
})
rsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
\tAddFunc: dc.addReplicaSet,
\tUpdateFunc: dc.updateReplicaSet,
\tDeleteFunc: dc.deleteReplicaSet,
})
podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
\tDeleteFunc: dc.deletePod,
})
複製代碼/<code>
可以看到,在 Controller Manager 的幫助下,Controller 的邏輯可以做的非常純粹,只需要實現相應的 EventHandler 即可,那麼 Controller Manager 都做了哪些具體的工作呢?
Controller Manager 架構
輔助 Controller Manager 完成事件分發的是 client-go,而其中比較關鍵的模塊便是 informer。
kubernetes 在 github 上提供了一張 client-go 的架構圖,從中可以看出,Controller 正是下半部分(CustomController)描述的內容,而 Controller Manager 主要完成的是上半部分。
Informer 工廠
從上圖可以看到 Informer 是一個非常關鍵的 “橋樑” 作用,因此對 Informer 的管理便是 Controller Manager 要做的第一件事。
在 Controller Manager 啟動時,便會創建一個名為 SharedInformerFactory 的單例工廠,因為每個 Informer 都會與 Api Server 維持一個 watch 長連接,所以這個單例工廠通過為所有 Controller 提供了唯一獲取 Informer 的入口,來保證每種類型的 Informer 只被實例化一次。
該單例工廠的初始化邏輯:
<code>// NewSharedInformerFactoryWithOptions constructs a new instance of a SharedInformerFactory with additional options.
func NewSharedInformerFactoryWithOptions(client kubernetes.Interface, defaultResync time.Duration, options ...SharedInformerOption) SharedInformerFactory {
\tfactory := &sharedInformerFactory{
\t\tclient: client,
\t\tnamespace: v1.NamespaceAll,
\t\tdefaultResync: defaultResync,
\t\tinformers: make(map[reflect.Type]cache.SharedIndexInformer),
\t\tstartedInformers: make(map[reflect.Type]bool),
\t\tcustomResync: make(map[reflect.Type]time.Duration),
\t}
\t// Apply all options
\tfor _, opt := range options {
\t\tfactory = opt(factory)
\t}
\treturn factory
}
複製代碼/<code>
從上面的初始化邏輯中可以看到,sharedInformerFactory 中最重要的是名為 informers 的 map,其中 key 為資源類型,而 value 便是關注該資源類型的 Informer。每種類型的 Informer 只會被實例化一次,並存儲在 map 中,不同 Controller 需要相同資源的 Informer 時只會拿到同一個 Informer 實例。
對於 Controller Manager 來說,維護所有的 Informer 使其正常工作,是保證所有 Controller 正常工作的基礎條件。sharedInformerFactory 通過該 map 維護了所有的 informer 實例,因此,sharedInformerFactory 也承擔了提供統一啟動入口的職責:
<code>// Start initializes all requested informers.
func (f *sharedInformerFactory) Start(stopCh \tf.lock.Lock()
\tdefer f.lock.Unlock()
\tfor informerType, informer := range f.informers {
\t\tif !f.startedInformers[informerType] {
\t\t\tgo informer.Run(stopCh)
\t\t\tf.startedInformers[informerType] = true
\t\t}
\t}
}
複製代碼/<code>
當 Controller Manager 啟動時,最重要的就是通過該工廠的 Start 方法,將所有的 Informer 運行起來。
Informer 的創建
下面看下這些 Informer 是怎麼被創建的。Controller Manager 在 cmd/kube-controller-manager/app/controllermanager.go 的 NewControllerInitializers 函數中初識化了所有的 Controller,因為代碼冗長,這裡僅拿 Deployment Controller 舉例子。
初始化 Deployment Controller 的邏輯在 cmd/kube-controller-manager/app/apps.go 的 startDeploymentController 的函數中:
<code>func startDeploymentController(ctx ControllerContext) (http.Handler, bool, error) {
\tif !ctx.AvailableResources[schema.GroupVersionResource{Group: "apps", Version: "v1", Resource: "deployments"}] {
\t\treturn nil, false, nil
\t}
\tdc, err := deployment.NewDeploymentController(
\t\tctx.InformerFactory.Apps().V1().Deployments(),
\t\tctx.InformerFactory.Apps().V1().ReplicaSets(),
\t\tctx.InformerFactory.Core().V1().Pods(),
\t\tctx.ClientBuilder.ClientOrDie("deployment-controller"),
\t)
\tif err != nil {
\t\treturn nil, true, fmt.Errorf("error creating Deployment controller: %v", err)
\t}
\tgo dc.Run(int(ctx.ComponentConfig.DeploymentController.ConcurrentDeploymentSyncs), ctx.Stop)
\treturn nil, true, nil
}
複製代碼/<code>
最關鍵的邏輯在 deployment.NewDeploymentController 上,該函數真正創建了 Deployment Controller,而該創建函數的前三個參數分別為 Deployment、ReplicaSet、Pod 的 Informer。可以看到,Informer 的單例工廠以 ApiGroup 為路徑提供了不同資源的 Informer 創建入口。
不過要注意的是,.Apps().V1().Deployments() 雖然返回的是 deploymentInformer 類型的實例,但是,deploymentInformer 其實並不是一個真正的 Informer(儘管他以 Informer 命名),它只是一個模板類,主要功能是提供關注 Deployment 這一特定資源 Informer 的創建模板:
<code>// Deployments returns a DeploymentInformer.
func (v *version) Deployments() DeploymentInformer {
\treturn &deploymentInformer{factory: v.factory, namespace: v.namespace, tweakListOptions: v.tweakListOptions}
}
複製代碼/<code>
真正創建 Informer 的邏輯是在 deploymentInformer.Informer() 中(client-go/informers/apps/v1/deployment.go),f.defaultInformer 是默認的 Deployment Informer 創建模板方法,通過將資源實例和該模板方法傳入 Informer 工廠的 InformerFor 方法,來創建僅關注 Deployment 資源的 Informer:
<code>func (f *deploymentInformer) Informer() cache.SharedIndexInformer {
\treturn f.factory.InformerFor(&appsv1.Deployment{}, f.defaultInformer)
}
複製代碼/<code>
簡單梳理一下:
- 可以通過 Informer 工廠獲得特定類型的 Informer 模板類(即這裡的 deploymentInformer)
- 真正創建該特定資源 Informer 的是 Informer 模板類的 Informer() 方法
- 而 Informer() 方法只不過是通過 Informer 工廠的 InformerFor 來創建真正的 Informer
這裡用到了模板方法(設計模式),雖然有一點繞口,但可以參考下圖梳理一下,理解關鍵在於 Informer 的 差異化的創建邏輯下放給了模板類:
最後,名為 sharedIndexInformer 的結構體將被實例化,並真正的承擔 Informer 的職責。被註冊到 Informer 工廠 map 中的也是該實例。
Informer 的運行
因為真正的 Informer 實例是一個 sharedIndexInformer 類型的對象,當 Informer 工廠啟動時(執行 Start 方法),被真正運行起來的是 sharedIndexInformer。
sharedIndexInformer 是 client-go 裡的組件,它的 Run 方法雖然短短几十行,但是卻承擔了很多工作。到這裡,才到了 Controller Manager 最有趣的部分。
<code>func (s *sharedIndexInformer) Run(stopCh \tdefer utilruntime.HandleCrash()
\tfifo := NewDeltaFIFO(MetaNamespaceKeyFunc, s.indexer)
\tcfg := &Config{
\t\tQueue: fifo,
\t\tListerWatcher: s.listerWatcher,
\t\tObjectType: s.objectType,
\t\tFullResyncPeriod: s.resyncCheckPeriod,
\t\tRetryOnError: false,
\t\tShouldResync: s.processor.shouldResync,
\t\tProcess: s.HandleDeltas,
\t}
\tfunc() {
\t\ts.startedLock.Lock()
\t\tdefer s.startedLock.Unlock()
\t\ts.controller = New(cfg)
\t\ts.controller.(*controller).clock = s.clock
\t\ts.started = true
\t}()
\t// Separate stop channel because Processor should be stopped strictly after controller
\tprocessorStopCh := make(chan struct{})
\tvar wg wait.Group
\tdefer wg.Wait() // Wait for Processor to stop
\tdefer close(processorStopCh) // Tell Processor to stop
\twg.StartWithChannel(processorStopCh, s.cacheMutationDetector.Run)
\twg.StartWithChannel(processorStopCh, s.processor.run)
\tdefer func() {
\t\ts.startedLock.Lock()
\t\tdefer s.startedLock.Unlock()
\t\ts.stopped = true // Don't want any new listeners
\t}()
\ts.controller.Run(stopCh)
}
複製代碼/<code>
sharedIndexInformer 的啟動邏輯主要做了下面幾件事:
- 創建了名為 fifo 的隊列
- 創建並運行了一個名為 controller 的實例
- 啟動了 cacheMutationDetector
- 啟動了 processor
這幾個名詞(或者說組件)前文並沒有提到過,而這四件事情是 Controller Manager 工作的核心內容,因此下面我會分別介紹。
sharedIndexInformer
sharedIndexInformer 是一個共享的 Informer 框架,不同的 Controller 只需要提供一個模板類(比如上文提到的 deploymentInformer ),便可以創建一個符合自己需求的特定 Informer。
sharedIndexInformer 包含了一堆工具來完成 Informer 的任務,其主要代碼在 client-go/tools/cache/shared_informer.go 中。其創建邏輯也在其中:
<code>// NewSharedIndexInformer creates a new instance for the listwatcher.
func NewSharedIndexInformer(lw ListerWatcher, objType runtime.Object, defaultEventHandlerResyncPeriod time.Duration, indexers Indexers) SharedIndexInformer {
\trealClock := &clock.RealClock{}
\tsharedIndexInformer := &sharedIndexInformer{
\t\tprocessor: &sharedProcessor{clock: realClock},
\t\tindexer: NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers),
\t\tlisterWatcher: lw,
\t\tobjectType: objType,
\t\tresyncCheckPeriod: defaultEventHandlerResyncPeriod,
\t\tdefaultEventHandlerResyncPeriod: defaultEventHandlerResyncPeriod,
\t\tcacheMutationDetector: NewCacheMutationDetector(fmt.Sprintf("%T", objType)),
\t\tclock: realClock,
\t}
\treturn sharedIndexInformer
}
複製代碼/<code>
在創建邏輯中,有幾個東西需要留意:
- processor:提供了 EventHandler 註冊和事件分發的功能
- indexer:提供了資源緩存的功能
- listerWatcher:由模板類提供,包含特定資源的 List 和 Watch 方法
- objectType:用來標記關注哪種特定資源類型
- cacheMutationDetector:監控 Informer 的緩存
除此之外,還包含了上文啟動邏輯中提到了 DeltaFIFO 隊列和 controller,下面就分別介紹。
sharedProcessor
processor 是 sharedIndexInformer 中一個非常有趣的組件,Controller Manager 通過一個 Informer 單例工廠來保證不同的 Controller 共享了同一個 Informer,但是不同的 Controller 對該共享的 Informer 註冊的 Handler 不同,那麼 Informer 應該怎麼管理被註冊的 Handler 呢?
processor 便是用來管理被註冊的 Handler 以及將事件分發給不同 Handler 的組件。
<code>type sharedProcessor struct {
\tlistenersStarted bool
\tlistenersLock sync.RWMutex
\tlisteners []*processorListener
\tsyncingListeners []*processorListener
\tclock clock.Clock
\twg wait.Group
}
複製代碼/<code>
sharedProcessor 的工作核心是圍繞著 listeners 這個 Listener 切片展開的。
當我們註冊一個 Handler 到 Informer 時,最終會被轉換為一個名為 processorListener 結構體的實例:
<code>func newProcessListener(handler ResourceEventHandler, requestedResyncPeriod, resyncPeriod time.Duration, now time.Time, bufferSize int) *processorListener {
\tret := &processorListener{
\t\tnextCh: make(chan interface{}),
\t\taddCh: make(chan interface{}),
\t\thandler: handler,
\t\tpendingNotifications: *buffer.NewRingGrowing(bufferSize),
\t\trequestedResyncPeriod: requestedResyncPeriod,
\t\tresyncPeriod: resyncPeriod,
\t}
\tret.determineNextResync(now)
\treturn ret
}
複製代碼/<code>
該實例主要包含兩個 channel 和外面註冊的 Handler 方法。而此處被實例化的 processorListener 對象最終會被添加到 sharedProcessor.listeners 列表中:
<code>func (p *sharedProcessor) addListener(listener *processorListener) {
\tp.listenersLock.Lock()
\tdefer p.listenersLock.Unlock()
\tp.addListenerLocked(listener)
\tif p.listenersStarted {
\t\tp.wg.Start(listener.run)
\t\tp.wg.Start(listener.pop)
\t}
}
複製代碼/<code>
如圖所示,Controller 中的 Handler 方法最終會被添加到 Listener 中,而 Listener 將會被 append 到 sharedProcessor 的 Listeners 切片中。
前文提到,sharedIndexInformer 啟動時會將 sharedProcessor 運行起來,而 sharedProcessor 的啟動邏輯便是和這些 listener 有關:
<code>func (p *sharedProcessor) run(stopCh \tfunc() {
\t\tp.listenersLock.RLock()
\t\tdefer p.listenersLock.RUnlock()
\t\tfor _, listener := range p.listeners {
\t\t\tp.wg.Start(listener.run)
\t\t\tp.wg.Start(listener.pop)
\t\t}
\t\tp.listenersStarted = true
\t}()
\t\tp.listenersLock.RLock()
\tdefer p.listenersLock.RUnlock()
\tfor _, listener := range p.listeners {
\t\tclose(listener.addCh) // Tell .pop() to stop. .pop() will tell .run() to stop
\t}
\tp.wg.Wait() // Wait for all .pop() and .run() to stop
}
複製代碼/<code>
可以看到,sharedProcessor 啟動時會依次執行 listener 的 run 和 pop 方法,我們現在看下這兩個方法。
listener 的啟動
因為 listener 包含了 Controller 註冊進來的 Handler 方法,因此 listener 最重要的職能就是當事件發生時來觸發這些方法,而 listener.run 就是不停的從 nextCh 這個 channel 中拿到事件並執行對應的 handler:
<code>func (p *processorListener) run() {
\t// this call blocks until the channel is closed. When a panic happens during the notification
\t// we will catch it, **the offending item will be skipped!**, and after a short delay (one second)
\t// the next notification will be attempted. This is usually better than the alternative of never
\t// delivering again.
\tstopCh := make(chan struct{})
\twait.Until(func() {
\t\t// this gives us a few quick retries before a long pause and then a few more quick retries
\t\terr := wait.ExponentialBackoff(retry.DefaultRetry, func() (bool, error) {
\t\t\tfor next := range p.nextCh {
\t\t\t\tswitch notification := next.(type) {
\t\t\t\tcase updateNotification:
\t\t\t\t\tp.handler.OnUpdate(notification.oldObj, notification.newObj)
\t\t\t\tcase addNotification:
\t\t\t\t\tp.handler.OnAdd(notification.newObj)
\t\t\t\tcase deleteNotification:
\t\t\t\t\tp.handler.OnDelete(notification.oldObj)
\t\t\t\tdefault:
\t\t\t\t\tutilruntime.HandleError(fmt.Errorf("unrecognized notification: %T", next))
\t\t\t\t}
\t\t\t}
\t\t\t// the only way to get here is if the p.nextCh is empty and closed
\t\t\treturn true, nil
\t\t})
\t\t// the only way to get here is if the p.nextCh is empty and closed
\t\tif err == nil {
\t\t\tclose(stopCh)
\t\t}
\t}, 1*time.Minute, stopCh)
}
複製代碼/<code>
可以看到,listener.run 不停的從 nextCh 這個 channel 中拿到事件,但是 nextCh 這個 channel 裡的事件又是從哪來的呢?listener.pop 的職責便是將事件放入 nextCh 中。
listener.pop 是一段非常精巧和有趣的邏輯:
<code>func (p *processorListener) pop() {
\tdefer utilruntime.HandleCrash()
\tdefer close(p.nextCh) // Tell .run() to stop
\tvar nextCh chan\tvar notification interface{}
\tfor {
\t\tselect {
\t\tcase nextCh \t\t\t// Notification dispatched
\t\t\tvar ok bool
\t\t\tnotification, ok = p.pendingNotifications.ReadOne()
\t\t\tif !ok { // Nothing to pop
\t\t\t\tnextCh = nil // Disable this select case
\t\t\t}
\t\tcase notificationToAdd, ok := \t\t\tif !ok {
\t\t\t\treturn
\t\t\t}
\t\t\tif notification == nil { // No notification to pop (and pendingNotifications is empty)
\t\t\t\t// Optimize the case - skip adding to pendingNotifications
\t\t\t\tnotification = notificationToAdd
\t\t\t\tnextCh = p.nextCh
\t\t\t} else { // There is already a notification waiting to be dispatched
\t\t\t\tp.pendingNotifications.WriteOne(notificationToAdd)
\t\t\t}
\t\t}
\t}
}
複製代碼/<code>
listener 之所以包含了兩個 channel:addCh 和 nextCh,是因為 Informer 無法預知 listener.handler 的事件消費的速度是否大於事件生產的速度,因此添加了一個名為 pendingNotifications 的緩衝隊列來保存未來得及消費的事件。
pop 方法一方面會不停的從 addCh 中獲得最新事件,以保證不會讓生產方阻塞。然後判斷是否存在 buffer,如果存在則把事件添加到 buffer 中,如果不存在則嘗試推給 nextCh。
而另一方面,會判斷 buffer 中是否還有事件,如果還有存量,則不停的傳遞給 nextCh。
pop 方法實現了一個帶 buffer 的分發機制,使得事件可以源源不斷的從 addCh 到 nextCh。但是問題來了,那 addCh 的事件從哪來呢。
其實來源非常簡單,listener 有一個 add 方法,入參是一個事件,該方法會將新事件推入 addCh 中。而調用該 add 方法的是管理所有 listener 的 sharedProcessor。
上面提到過,sharedProcessor 的職責便是管理所有的 Handler 以及分發事件,而真正做分發工作的是 distribute 方法:
<code>func (p *sharedProcessor) distribute(obj interface{}, sync bool) {
\tp.listenersLock.RLock()
\tdefer p.listenersLock.RUnlock()
\tif sync {
\t\tfor _, listener := range p.syncingListeners {
\t\t\tlistener.add(obj)
\t\t}
\t} else {
\t\tfor _, listener := range p.listeners {
\t\t\tlistener.add(obj)
\t\t}
\t}
}
複製代碼/<code>
到目前為止,我們有一部分比較清晰了:
- Controller 將 Handler 註冊給 Informer
- Informer 通過 sharedProcessor 維護了所有的 Handler(listener)
- Informer 收到事件時,通過 sharedProcessor.distribute 將事件分發下去
- Controller 被觸發對應的 Handler 來處理自己的邏輯
那麼剩下的問題就是 Informer 的事件從哪來呢?
DeltaFIFO
在分析 Informer 獲取事件之前,需要提前講一個非常有趣的小工具,就是在 sharedIndexInformer.Run 的時候創建的 fifo 隊列:
<code>fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, s.indexer)
複製代碼/<code>
DeltaFIFO 是一個非常有趣的隊列,相關代碼定義在 client-go/tools/cache/delta_fifo.go 中。對於一個隊列來說,最重要的肯定是 Add 方法和 Pop 方法,DeltaFIFO 提供了多個 Add 方法,雖然根據不同的事件類型(add/update/delete/sync)區分不同的方法,但是最終都會執行 queueActionLocked:
<code>// queueActionLocked appends to the delta list for the object.
// Caller must lock first.
func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) error {
\tid, err := f.KeyOf(obj)
\tif err != nil {
\t\treturn KeyError{obj, err}
\t}
\t// If object is supposed to be deleted (last event is Deleted),
\t// then we should ignore Sync events, because it would result in
\t// recreation of this object.
\tif actionType == Sync && f.willObjectBeDeletedLocked(id) {
\t\treturn nil
\t}
\tnewDeltas := append(f.items[id], Delta{actionType, obj})
\tnewDeltas = dedupDeltas(newDeltas)
\tif len(newDeltas) > 0 {
\t\tif _, exists := f.items[id]; !exists {
\t\t\tf.queue = append(f.queue, id)
\t\t}
\t\tf.items[id] = newDeltas
\t\tf.cond.Broadcast()
\t} else {
\t\t// We need to remove this from our map (extra items in the queue are
\t\t// ignored if they are not in the map).
\t\tdelete(f.items, id)
\t}
\treturn nil
}
複製代碼/<code>
queueActionLocked 方法的第一個參數 actionType 便是事件類型:
<code>const (
\tAdded DeltaType = "Added" // watch api 獲得的創建事件
\tUpdated DeltaType = "Updated" // watch api 獲得的更新事件
\tDeleted DeltaType = "Deleted" // watch api 獲得的刪除事件
\tSync DeltaType = "Sync" // 觸發了 List Api,需要刷新緩存
)
複製代碼/<code>
從事件類型以及入隊列方法可以看出,這是一個帶有業務功能的隊列,並不是單純的“先入先出”,入隊列方法中有兩個非常精巧的設計:
- 入隊列的事件會先判斷該資源是否存在未被消費的事件,然後適當處理
- 如果 list 方法時發現該資源已經被刪除了,則不再處理
第二點比較好理解,如果觸發了 List 請求,而且發現要被處理的資源已經被刪除了,則就不需要再入隊列處理。而第一點需要結合出隊列方法一起來看:
<code>func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) {
\tf.lock.Lock()
\tdefer f.lock.Unlock()
\tfor {
\t\tfor len(f.queue) == 0 {
\t\t\t// When the queue is empty, invocation of Pop() is blocked until new item is enqueued.
\t\t\t// When Close() is called, the f.closed is set and the condition is broadcasted.
\t\t\t// Which causes this loop to continue and return from the Pop().
\t\t\tif f.IsClosed() {
\t\t\t\treturn nil, ErrFIFOClosed
\t\t\t}
\t\t\tf.cond.Wait()
\t\t}
\t\tid := f.queue[0]
\t\tf.queue = f.queue[1:]
\t\tif f.initialPopulationCount > 0 {
\t\t\tf.initialPopulationCount--
\t\t}
\t\titem, ok := f.items[id]
\t\tif !ok {
\t\t\t// Item may have been deleted subsequently.
\t\t\tcontinue
\t\t}
\t\tdelete(f.items, id)
\t\terr := process(item)
\t\tif e, ok := err.(ErrRequeue); ok {
\t\t\tf.addIfNotPresent(id, item)
\t\t\terr = e.Err
\t\t}
\t\t// Don't need to copyDeltas here, because we're transferring
\t\t// ownership to the caller.
\t\treturn item, err
\t}
}
複製代碼/<code>
DeltaFIFO 的 Pop 方法有一個入參,即是處理函數,出隊列時,DeltaFIFO 會先根據資源 id 獲得該資源 所有的事件,然後交給處理函數。
工作流程如圖所示:
總體來看,DeltaFIFO 的入隊列方法,會先判斷該資源是否已經在 items 中, 如果已經存在,說明該資源還沒有被消費(還在 queue 中排隊),則直接將事件 append 到 items[resource_id] 中即可。如果發現不在 items 中,便會創建 items[resource_id],並將資源 id append 到 queue 中。
而 DeltaFIFO 出隊列方法,會從 queue 中拿到隊列最前面的資源 id,然後從 items 中拿走該資源所有的事件,最後調用 Pop 方法傳入的 PopProcessFunc 類型的處理函數。
因此,DeltaFIFO 的特點在於,入隊列的是(資源的)事件,而出隊列時是拿到的是最早入隊列的資源的所有事件。這樣的設計保證了不會因為有某個資源瘋狂的製造事件,導致其他資源沒有機會被處理而產生飢餓。
controller
DeltaFIFO 是一個非常重要的組件,真正讓他發揮價值的,便是 Informer 的 controller。
雖然 K8s 源碼中的確用的是 controller 這個詞,但是此 controller 並不是 Deployment Controller 這種資源控制器。而是一個承上啟下的事件控制器(從 API Server 拿到事件,下發給 Informer 進行處理)。
controller 的職責就兩個:
- 通過 List-Watch 從 Api Server 獲得事件、並將該事件推入 DeltaFIFO 中
- 將 sharedIndexInformer 的 HandleDeltas 方法作為參數,來調用 DeltaFIFO 的 Pop 方法
controller 的定義非常簡單,它的核心就是 Reflector:
<code>type controller struct {
\tconfig Config
\treflector *Reflector
\treflectorMutex sync.RWMutex
\tclock clock.Clock
}
複製代碼/<code>
Reflector 的代碼比較繁瑣但是功能比較簡單,就是通過 sharedIndexInformer 裡定義的 listerWatcher 進行 List-Watch,並將獲得的事件推入 DeltaFIFO 中。
controller 啟動之後會先將 Reflector 啟動,然後在執行 processLoop,通過一個死循環,不停的將從 DeltaFIFO 讀出需要處理的資源事件,並交給 sharedIndexInformer 的 HandleDeltas 方法(創建 controller 時賦值給了 config.Process)。
<code>func (c *controller) processLoop() {
\tfor {
\t\tobj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))
\t\tif err != nil {
\t\t\tif err == ErrFIFOClosed {
\t\t\t\treturn
\t\t\t}
\t\t\tif c.config.RetryOnError {
\t\t\t\t// This is the safe way to re-enqueue.
\t\t\t\tc.config.Queue.AddIfNotPresent(obj)
\t\t\t}
\t\t}
\t}
}
複製代碼/<code>
如果我們再查看下 sharedIndexInformer 的 HandleDeltas 方法,就會發現整個事件消費流程被打通了:
<code>func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {
\ts.blockDeltas.Lock()
\tdefer s.blockDeltas.Unlock()
\t// from oldest to newest
\tfor _, d := range obj.(Deltas) {
\t\tswitch d.Type {
\t\tcase Sync, Added, Updated:
\t\t\tisSync := d.Type == Sync
\t\t\ts.cacheMutationDetector.AddObject(d.Object)
\t\t\tif old, exists, err := s.indexer.Get(d.Object); err == nil && exists {
\t\t\t\tif err := s.indexer.Update(d.Object); err != nil {
\t\t\t\t\treturn err
\t\t\t\t}
\t\t\t\ts.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync)
\t\t\t} else {
\t\t\t\tif err := s.indexer.Add(d.Object); err != nil {
\t\t\t\t\treturn err
\t\t\t\t}
\t\t\t\ts.processor.distribute(addNotification{newObj: d.Object}, isSync)
\t\t\t}
\t\tcase Deleted:
\t\t\tif err := s.indexer.Delete(d.Object); err != nil {
\t\t\t\treturn err
\t\t\t}
\t\t\ts.processor.distribute(deleteNotification{oldObj: d.Object}, false)
\t\t}
\t}
\treturn nil
}
複製代碼/<code>
前面我們知道了 processor.distribute 方法可以將事件分發給所有 listener,而 controller 會使用 Reflector 從 ApiServer 拿到事件,併入隊列,然後通過 processLoop 從隊列中拿出要處理的資源的所有事件,最後通過 sharedIndexInformer 的 HandleDeltas 方法,調用了 processor.distribute。
因此,我們可以將整個事件流向整理為下圖:
Indexer
以上,我們將事件從接收到分發,中間所有的邏輯已經梳理了一遍,但是在 sharedIndexInformer 的 HandleDeltas 方法中,還有一些邏輯比較令人注意,就是所有的事件都會先對 s.indexer 進行更新,然後在分發。
前面提到 Indexer 是一個線程安全的存儲,作為緩存使用,為了減輕資源控制器(Controller)查詢資源時對 ApiServer 的壓力。
當有任何事件更新時,會先刷新 Indexer 裡的緩存,然後再將事件分發給資源控制器,資源控制器在需要獲得資源詳情的時候,優先從 Indexer 獲得,就可以減少對 APIServer 不必要的查詢請求。
Indexer 存儲的具體實現在 client-go/tools/cache/thread_safe_store.go 中,數據存儲在 threadSafeMap 中:
<code>type threadSafeMap struct {
\tlock sync.RWMutex
\titems map[string]interface{}
\t// indexers maps a name to an IndexFunc
\tindexers Indexers
\t// indices maps a name to an Index
\tindices Indices
}
複製代碼/<code>
從本質上講,threadSafeMap 就是加了一個讀寫鎖的 map。除此之外,還可以定義索引,索引的實現非常有趣,通過兩個字段完成:
- Indexers 是一個 map,定義了若干求索引函數,key 為 indexName,value 為求索引的函數(計算資源的索引值)。
- Indices 則保存了索引值和數據 key 的映射關係,Indices 是一個兩層的 map,第一層的 key 為 indexName,和 Indexers 對應,確定使用什麼方法計算索引值,value 是一個 map,保存了 “索引值-資源key” 的關聯關係。
相關邏輯比較簡單,可以參考下圖:
MutationDetector
sharedIndexInformer 的 HandleDeltas 方法中,除了向 s.indexer 更新的數據之外,還向 s.cacheMutationDetector 更新了數據。
在一開始講到 sharedIndexInformer 啟動時還會啟動一個 cacheMutationDetector,來監控 indexer 的緩存。
因為 indexer 緩存的其實是一個指針,多個 Controller 訪問 indexer 緩存的資源,其實獲得的是同一個資源實例。如果有一個 Controller 並不本分,修改了資源的屬性,勢必會影響到其他 Controller 的正確性。
MutationDetector 的作用正是定期檢查有沒有緩存被修改,當 Informer 接收到新事件時,MutationDetector 會保存該資源的指針(和 indexer 一樣),以及該資源的深拷貝。通過定期檢查指針指向的資源和開始存儲的深拷貝是否一致,便知道被緩存的資源是否被修改。
不過,具體是否啟用監控是受到環境變量 KUBE_CACHE_MUTATION_DETECTOR 影響的,如果不設置該環境變量,sharedIndexInformer 實例化的是 dummyMutationDetector,在啟動後什麼事情也不做。
如果 KUBE_CACHE_MUTATION_DETECTOR 為 true,則 sharedIndexInformer 實例化的是 defaultCacheMutationDetector,該實例會以 1s 為間隔,定期執行檢查緩存,如果發現緩存被修改,則會觸發一個失敗處理函數,如果該函數沒被定義,則會觸發一個 panic。
總結
本文講解的應該算是狹義的 Controller Manager,畢竟沒有包含具體的資源管理器(Controller),而只是講解 Controller Manager 是怎麼 “Manage Controller” 的。
可以看到 Controller Manager 做了很多工作來保證 Controller 可以只專注於處理自己關心的事件,而這些工作的核心就是 Informer,當理解了 Informer 是如何與其他組件協同工作,那麼 Controller Manager 為資源管理器鋪墊了什麼也就瞭然了。
閱讀更多 Echa攻城獅 的文章