在 Kubernetes 中通過 Ingress 來暴露服務到集群外部,這個已經是一個很普遍的方式了,而真正扮演請求轉發的角色是背後的 Ingress Controller,比如我們經常使用的 traefik、ingress-nginx 等就是一個 Ingress Controller。本文我們將通過 golang 來實現一個簡單的自定義的 Ingress Controller,可以加深我們對 Ingress 的理解。
概述
我們在 Kubernetes 集群上往往會運行很多無狀態的 Web 應用,一般來說這些應用是通過一個 Deployment 和一個對應的 Service 組成,比如我們在集群上運行一個 whoami 的應用,對應的資源清單如下所示:(whoami.yaml)
apiVersion: apps/v1
kind: Deployment
metadata:
name: whoami
labels:
app: whoami
spec:
replicas: 1
selector:
matchLabels:
app: whoami
template:
metadata:
labels:
app: whoami
spec:
containers:
- name: whoami
image: cnych/whoami
ports:
- containerPort: 80
---
kind: Service
apiVersion: v1
metadata:
name: whoami
spec:
selector:
app: whoami
ports:
- protocol: TCP
port: 80
targetPort: 80
可以直接使用上面的資源清單部署該應用:
$ kubectl apply -f whoami.yaml
通過部署該應用,在 Kubernetes 集群內部我們可以通過地址 whoami.default.svc.cluster.local 來訪問該 Web 應用,但是在集群外部的用戶應該如何來訪問呢?當然我們可以使用 NodePort 類型的 Service 來進行訪問,但是當我們應用越來越多的時候端口的管理也是一個很大的問題,所以一般情況下不採用該方式,之前我們的方法是用 DaemonSet 在每個邊緣節點上運行一個 Nginx 應用:
spec:
hostNetwork: true
containers:
- image: nginx:1.15.3-alpine
name: nginx
ports:
- name: http
containerPort: 80
hostPort: 80
通過設置 hostNetwork:true,容器將綁定節點的80端口,而不僅僅是容器,這樣我們就可以通過節點的公共 IP 地址的 80 端口訪問到 Nginx 應用了。這種方法理論上肯定是有效的,但是有一個最大的問題就是需要創建一個 Nginx 配置文件,如果應用有變更,還需要手動修改配置,不能自動發現和熱更新,這對於大量的應用維護的成本顯然太大。這個時候我們就可以用另外一個 Kubernetes 提供的方案了:Ingress。
Ingress 對象
Kubernetes 內置就支持通過 Ingress 對象將外部的域名映射到集群內部服務,我們可以通過如下的 Ingress 對象來對外暴露服務:
apiVersion: extensions/v1beta1
kind: Ingress
metadata:
name: whoami
spec:
tls:
- hosts:
- "*.qikqiak.com"
secretName: qikqiak-tls
rules:
- host: who.qikqiak.com
http:
paths:
- path: /
backend:
serviceName: whoami
servicePort: 80
該資源清單聲明瞭如何將 HTTP 請求路由到後端服務:
- 任何到域名 who.qikqiak.com 的請求都將被路由到 whoami 服務後面的 Pod 列表中去。
- 如果是 HTTPS 請求,並且域名匹配 *.qikqiak.com,則對請求使用 qikqiak-tls 這個證書。
這個配置顯然比我們取手動維護 Nginx 的配置要方便太多了,完全就是自動化的。
Ingress Controllers
上面我們聲明的 Ingress 對象,只是一個集群的資源對象而已,並不會去真正處理我們的請求,這個時候我們還必須安裝一個 Ingress Controller,該控制器負責讀取 Ingress 對象的規則並進行真正的請求處理,簡單來說就是 Ingress 對象只是一個聲明,Ingress Controllers 就是真正的實現。
對於 Ingress Controller 有很多種選擇,比如我們前面文章大量提到的 traefik、或者 ingress-nginx 等等,我們可以根據自己的需求選擇合適的 Ingress Controller 安裝即可。
但是實際上,自定義一個 Ingress Controller 也是非常簡單的(當然要支持各種請求特性就需要大量的工作了)。
自定義 Ingress Controller
這裡我們將用 Golang 來自定義一個簡單的 Ingress Controller,自定義的控制器主要需要實現以下幾個功能:
- 通過 Kubernetes API 查詢和監聽 Service、Ingress 以及 Secret 這些對象
- 加載 TLS 證書用於 HTTPS 請求
- 根據加載的 Kubernetes 數據構造一個用於 HTTP 服務的路由,當然該路由需要非常高效,因為所有傳入的流量都將通過該路由
- 在 80 和 443 端口上監聽傳入的 HTTP 請求,然後根據路由查找對應的後端服務,然後代理請求和響應。443 端口將使用 TLS 證書進行安全連接。
下面我們將來依次介紹上面的實現。
Kubernetes 對象查詢
我們可以通過一個 rest 配置然後調用 NewForConfig 來創建一個 Kubernetes 客戶端,由於我們要通過集群內部的 Service 進行服務的訪問,所以不能在集群外部使用,所以不能使用 kubeconfig 的方式來獲取 Config:
config, err := rest.InClusterConfig()
if err != nil {
log.Fatal().Err(err).Msg("get kubernetes configuration failed")
}
client, err := kubernetes.NewForConfig(config)
if err != nil {
log.Fatal().Err(er).Msg("create kubernetes client failed")
}
然後我們創建一個 Watcher 和 Payload,Watcher 是來負責查詢 Kubernetes 和創建 Payloads 的,Payloads 包含了滿足 HTTP 請求所需要的所有的 Kubernetes 數據:
// 通過 Watcher 加載的 Kubernetes 的數據集合。
type Payload struct {
Ingresses []IngressPayload
TLSCertificates map[string]*tls.Certificate
}
// 一個 IngressPayload 是一個 Ingress 加上他的服務端口。
type IngressPayload struct {
Ingress *extensionsv1beta1.Ingress
ServicePorts map[string]map[string]int
}
另外需要注意除了端口外,Ingress 還可以通過端口名稱來引用後端服務的端口,所以我們可以通過查詢相應的 Service 來填充該數據。
Watcher 主要用來監聽 Ingress、Service、Secret 的變化:
// 在 Kubernetes 集群中監聽 Ingress 對象的 Watcher
type Watcher struct {
client kubernetes.Interface
onChange func(*Payload)
}
只要我們檢測到某些變化,就會調用 onChange 函數。為了實現上面的監聽功能,我們需要使用 k8s.io/client-go/informers 這個包,該包提供了一種類型安全、高效的機制來查詢、list 和 watch Kubernetes 對象,我們只需要為需要的每個對象創建一個 SharedInformerFactory 以及 Listers 即可:
func (w *Watcher) Run(ctx context.Context) error {
factory := informers.NewSharedInformerFactory(w.client, time.Minute)
secretLister := factory.Core().V1().Secrets().Lister()
serviceLister := factory.Core().V1().Services().Lister()
ingressLister := factory.Extensions().V1beta1().Ingresses().Lister()
...
}
然後定義一個 onChange 的本地函數,該函數在檢測到變更時隨時調用。我們這裡在每種類型的變更時每次都從頭開始重新構建所有的內容,暫時還未考慮性能問題。因為 Watcher 和 HTTP 處理程序都在不同的 goroutine 中運行,所以我們基本上可以構建一個有效的負載,而不會影響任何正在進行的請求,當然這是一種簡單粗暴的做法。
我們可以通過從 listing ingresses 對象開始:
ingresses, err := ingressLister.List(labels.Everything())
if err != nil {
log.Error().Err(err).Msg("failed to list ingresses")
return
}
對於每個 ingress 對象,如果有 TLS 規則,則從 secrets 對象中加載證書:
for _, rec := range ingress.Spec.TLS {
if rec.SecretName != "" {
secret, err := secretLister.Secrets(ingress.Namespace).Get(rec.SecretName)
if err != nil {
log.Error().Err(err).Str("namespace", ingress.Namespace).Str("name", rec.SecretName).Msg("unknown secret")
continue
}
cert, err := tls.X509KeyPair(secret.Data["tls.crt"], secret.Data["tls.key"])
if err != nil {
log.Error().Err(err).Str("namespace", ingress.Namespace).Str("name", rec.SecretName).Msg("invalid tls certificate")
continue
}
payload.TLSCertificates[rec.SecretName] = &cert
}
}
Go 語言已經內置了一些和加密相關的包,可以很簡單的處理 TLS 證書,對於實際的 HTTP 規則,這裡我們添加了一個 addBackend 的輔助函數:
addBackend := func(ingressPayload *IngressPayload, backend extensionsv1beta1.IngressBackend) {
svc, err := serviceLister.Services(ingressPayload.Ingress.Namespace).Get(backend.ServiceName)
if err != nil {
log.Error().Err(err).Str("namespace", ingressPayload.Ingress.Namespace).Str("name", backend.ServiceName).Msg("unknown service")
} else {
m := make(map[string]int)
for _, port := range svc.Spec.Ports {
m[port.Name] = int(port.Port)
}
ingressPayload.ServicePorts[svc.Name] = m
}
}
每個 HTTP 規則和可選的默認規則都會調用該方法:
if ingress.Spec.Backend != nil {
addBackend(&ingressPayload, *ingress.Spec.Backend)
}
for _, rule := range ingress.Spec.Rules {
if rule.HTTP != nil {
continue
}
for _, path := range rule.HTTP.Paths {
addBackend(&ingressPayload, path.Backend)
}
}
然後調用 onChange 回調:
w.onChange(payload)
每當發生更改時,都會調用本地 onChange 函數,最後一步就是啟動我們的 informers:
var wg sync.WaitGroup
wg.Add(1)
go func() {
informer := factory.Core().V1().Secrets().Informer()
informer.AddEventHandler(handler)
informer.Run(ctx.Done())
wg.Done()
}()
wg.Add(1)
go func() {
informer := factory.Extensions().V1beta1().Ingresses().Informer()
informer.AddEventHandler(handler)
informer.Run(ctx.Done())
wg.Done()
}()
wg.Add(1)
go func() {
informer := factory.Core().V1().Services().Informer()
informer.AddEventHandler(handler)
informer.Run(ctx.Done())
wg.Done()
}()
wg.Wait()
我們這裡每個 informer 都使用同一個 handler:
debounced := debounce.New(time.Second)
handler := cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
debounced(onChange)
},
UpdateFunc: func(oldObj, newObj interface{}) {
debounced(onChange)
},
DeleteFunc: func(obj interface{}) {
debounced(onChange)
},
}
Debouncing(防抖動) 是一種避免事件重複的方法,我們設置一個小的延遲,如果在達到延遲之前發生了其他事件,則重啟計時器。
路由表
路由表的目標是通過預先計算大部分查詢相關信息來提高查詢效率,這裡我們就需要使用一些高效的數據結構來進行存儲,由於在集群中有大量的路由規則,所以要實現映射查詢既高效又容易理解的最簡單的方法我們能想到的就是使用 Map,Map 可以為我們提供 O(1) 效率的查詢,我們這裡使用 Map 進行初始化查找,如果在後面找到了多個規則,則使用切片來存儲這些規則。
一個路由表由兩個 Map 構成,一個是根據域名映射的證書,一個就是根據域名映射的後端路由表:
type RoutingTable struct {
certificatesByHost map[string]map[string]*tls.Certificate
backendsByHost map[string][]routingTableBackend
}
// NewRoutingTable 創建一個新的路由表
func NewRoutingTable(payload *watcher.Payload) *RoutingTable {
rt := &RoutingTable{
certificatesByHost: make(map[string]map[string]*tls.Certificate),
backendsByHost: make(map[string][]routingTableBackend),
}
rt.init(payload)
return rt
}
此外路由表下面還有兩個主要的方法:
// GetCertificate 獲得一個證書
func (rt *RoutingTable) GetCertificate(sni string) (*tls.Certificate, error) {
hostCerts, ok := rt.certificatesByHost[sni]
if ok {
for h, cert := range hostCerts {
if rt.matches(sni, h) {
return cert, nil
}
}
}
return nil, errors.New("certificate not found")
}
// GetBackend 通過給定的 host 和 path 獲取後端程序
func (rt *RoutingTable) GetBackend(host, path string) (*url.URL, error) {
// strip the port
if idx := strings.IndexByte(host, ':'); idx > 0 {
host = host[:idx]
}
backends := rt.backendsByHost[host]
for _, backend := range backends {
if backend.matches(path) {
return backend.url, nil
}
}
return nil, errors.New("backend not found")
}
其中 GetCertificate 來獲取用於安全連接的 TLS 證書。HTTP 處理程序使用 GetBackend 將請求代理到後端,對於 TLS 證書,我們還有一個 matches 方法來處理通配符證書:
func (rt *RoutingTable) matches(sni string, certHost string) bool {
for strings.HasPrefix(certHost, "*.") {
if idx := strings.IndexByte(sni, '.'); idx >= 0 {
sni = sni[idx+1:]
} else {
return false
}
certHost = certHost[2:]
}
return sni == certHost
}
其實對於後端應用來說,matches 方法實際上就是一個正則表達式匹配(因為 Ingress 對象的 path 字段定義的是一個正則表達式):
type routingTableBackend struct {
pathRE *regexp.Regexp
url *url.URL
}
func (rtb routingTableBackend) matches(path string) bool {
if rtb.pathRE == nil {
return true
}
return rtb.pathRE.MatchString(path)
}
HTTP Server
最後我們需要來實現一個 HTTP Server,用來接收網絡入口的請求。首先定義一個私有的 config 結構體:
type config struct {
host string
port int
tlsPort int
}
定義一個 Option 類型:
// config 的修改器
type Option func(*config)
定義一個設置 Option 的函數:
// WithHost 設置 host 綁定到 config 上。
func WithHost(host string) Option {
return func(cfg *config) {
cfg.host = host
}
}
服務的結構體和構造器如下所示:
// 代理 HTTP 請求
type Server struct {
cfg *config
routingTable atomic.Value
ready *Event
}
// New 創建一個新的服務
func New(options ...Option) *Server {
cfg := defaultConfig()
for _, o := range options {
o(cfg)
}
s := &Server{
cfg: cfg,
ready: NewEvent(),
}
s.routingTable.Store(NewRoutingTable(nil))
return s
}
通過使用一個合適的默認值,上面的初始化方法可以使大多數客戶端使用變得非常容易,同時還可以根據需要進行靈活的更改,這種 API 方法在 Go 語言中是非常普遍的,有很多實際示例,比如 gRPC 的 Dail 方法。
除了配置之外,我們的服務器還有指向路由表的指針和一個就緒的事件,用於在第一次設置 payload 時發出信號。但是需要注意的是,我們這裡使用的是 atomic.Value 來存儲路由表,這是為什麼呢?
由於這裡我們的應用不是線程安全的,如果在 HTTP 處理程序嘗試讀取路由表的同時對其進行了修改,則可能導致狀態錯亂或者程序崩潰。所以,我們需要防止同時讀取和寫入這個共享的數據結構,當然有多種方法可以實現該需求:
- 第一種就是我們這裡使用的 atomic.Value,該類型提供了一個 Load 和 Store 的方法,可以允許我們自動讀取/寫入該值。由於我們在每次更改時都會重新構建路由表,所以我們可以在一次操作中安全地交換新舊路由表,這和文檔中的 ReadMostly 示例非常相似:
不過這種方法的一個缺點是必須在運行時聲明存儲的值類型:
s.routingTable.Load().(*RoutingTable).GetBackend(r.Host, r.URL.Path)
- 另外我們也可以使用 Mutext 或 RWMutex 來控制對關鍵區域代碼的訪問:
// 讀
s.mu.RLock()
backendURL, err := s.routingTable.GetBackend(r.Host, r.URL.Path)
s.mu.RUnlock()
// 寫
rt := NewRoutingTable(payload)
s.mu.Lock()
s.routingTable = rt
s.mu.Unlock()
- 還有一種方法就是讓路由表本身成為線程安全的,使用 sync.Map 來代替 Map 並添加方法來動態更新路由表。一般來說,我會避免使用這種方法,它使代碼更難於理解和維護了,而且如果你實際上最終沒有多個 goroutine 訪問數據結構的話,就會增加不必要的開銷了。
真正的處理服務的 ServeHTTP 方法如下所示:
// ServeHTTP 處理 HTTP 請求
func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// 根據請求的域名和 Path 路徑獲取背後真實的後端地址
backendURL, err := s.routingTable.Load().(*RoutingTable).GetBackend(r.Host, r.URL.Path)
if err != nil {
http.Error(w, "upstream server not found", http.StatusNotFound)
return
}
log.Info().Str("host", r.Host).Str("path", r.URL.Path).Str("backend", backendURL.String()).Msg("proxying request")
// 對後端真實 URL 發起代理請求
p := httputil.NewSingleHostReverseProxy(backendURL)
p.ErrorLog = stdlog.New(log.Logger, "", 0)
p.ServeHTTP(w, r)
}
這裡我們使用了 httputil 這個包,該包具有反向代理的一些實現方法,我們可以將其用於 HTTP 服務,它可以將請求轉發到指定的 URL 上,然後將響應發送回客戶端。
Main 函數
將所有組件組合在一起,然後通過 main 方法提供入口,我們這裡使用 flag 包來提供一些命令行參數:
func main() {
flag.StringVar(&host, "host", "0.0.0.0", "the host to bind")
flag.IntVar(&port, "port", 80, "the insecure http port")
flag.IntVar(&tlsPort, "tls-port", 443, "the secure https port")
flag.Parse()
client, err := kubernetes.NewForConfig(getKubernetesConfig())
if err != nil {
log.Fatal().Err(err).Msg("failed to create kubernetes client")
}
s := server.New(server.WithHost(host), server.WithPort(port), server.WithTLSPort(tlsPort))
w := watcher.New(client, func(payload *watcher.Payload) {
s.Update(payload)
})
var eg errgroup.Group
eg.Go(func() error {
return s.Run(context.TODO())
})
eg.Go(func() error {
return w.Run(context.TODO())
})
if err := eg.Wait(); err != nil {
log.Fatal().Err(err).Send()
}
}
Kubernetes 配置
有了服務器代碼,現在我們就可以在 Kubernetes 上用 DaemonSet 控制器來運行我們的 Ingress Controller:(k8s-ingress-controller.yaml)
apiVersion: v1
kind: ServiceAccount
metadata:
name: k8s-simple-ingress-controller
namespace: default
---
kind: ClusterRole
apiVersion: rbac.authorization.k8s.io/v1beta1
metadata:
name: k8s-simple-ingress-controller
rules:
- apiGroups:
- ""
resources:
- services
- endpoints
- secrets
verbs:
- get
- list
- watch
- apiGroups:
- extensions
resources:
- ingresses
verbs:
- get
- list
- watch
---
kind: ClusterRoleBinding
apiVersion: rbac.authorization.k8s.io/v1beta1
metadata:
name: k8s-simple-ingress-controller
roleRef:
apiGroup: rbac.authorization.k8s.io
kind: ClusterRole
name: k8s-simple-ingress-controller
subjects:
- kind: ServiceAccount
name: k8s-simple-ingress-controller
namespace: default
---
apiVersion: extensions/v1beta1
kind: DaemonSet
metadata:
name: k8s-simple-ingress-controller
labels:
app: ingress-controller
spec:
selector:
matchLabels:
app: ingress-controller
template:
metadata:
labels:
app: ingress-controller
spec:
hostNetwork: true
dnsPolicy: ClusterFirstWithHostNet
serviceAccountName: k8s-simple-ingress-controller
containers:
- name: k8s-simple-ingress-controller
image: cnych/k8s-simple-ingress-controller:v0.1
ports:
- name: http
containerPort: 80
- name: https
containerPort: 443
由於我們要在應用中監聽 Ingress、Service、Secret 這些資源對象,所以需要聲明對應的 RBAC 權限,這樣當我們的請求到達 Ingress Controller 的節點後,然後根據 Ingress 對象的規則,將請求轉發到對應的 Service 上就完成了服務暴露的整個過程。
直接創建上面我們自定義的 Ingress Controller 的資源清單:
$ kubectl apply -f k8s-ingress-controller.yaml
$ kubectl get pods -l app=ingress-controller
NAME READY STATUS RESTARTS AGE
k8s-simple-ingress-controller-694df987c7-h2qlc 1/1 Running 0 7m59s
然後為我們最開始的 whoami 服務創建一個 Ingress 對象:(whoami-ingress.yaml)
apiVersion: extensions/v1beta1
kind: Ingress
metadata:
name: whoami
spec:
rules:
- host: who.qikqiak.com
http:
paths:
- path: /
backend:
serviceName: whoami
servicePort: 80
$ kubectl apply -f whoami-ingress.yaml
後將域名 who.qikqiak.com 解析到我們部署的 Ingress Controller 的 Pod 節點上,就可以直接訪問了:
$ kubectl logs -f k8s-simple-ingress-controller-694df987c7-h2qlc
5:37AM INF starting secure HTTP server addr=0.0.0.0:443
5:37AM INF starting insecure HTTP server addr=0.0.0.0:80
5:39AM INF proxying request backend=http://whoami:80 host=who.qikqiak.com path=/
到這裡我們就完成了自定義一個簡單的 Ingress Controller,當然這只是一個最基礎的功能,在實際使用中還會有更多的需求,比如 TCP 的支持、對請求進行一些修改之類的,這就需要花更多的時間去實現了。
本文相關代碼都整理到了 GitHub 上,地址:https://github.com/cnych/kubernetes-simple-ingress-controller。
參考鏈接
- https://kubernetes.io/docs/concepts/services-networking/ingress-controllers/
- http://www.doxsey.net/blog/how-to-build-a-custom-kubernetes-ingress-controller-in-go
閱讀更多 k8s技術圈 的文章