當使用複雜的分佈式系統時,可能會遇到併發處理的需求。我們知道golang的協程是處理併發的利器之一,加上Golang為靜態類型和編譯型使得其在企業中使用越來越廣泛。Mode.net公司系統每天要處理實時,快速和靈活的以毫秒為單位動態路由數據包的全球專用網絡和數據,需要高度併發的系統,而他們的動態路由就是使用Golang來構建的,本文我們介紹Mode.net在Golang構建分佈式動態路由系統時的經驗教訓。
併發探測鏈接指標
Mode.net的路由系統稱為HALO,是Hop-by-Hop Adaptive Link-State Optimal Routing(逐跳自適應鏈路狀態最佳路由)的前綴字母簡稱。動態路由算法部分依賴於鏈路度量來計算路由表。這些指標由位於每個PoP(存活節點)上的獨立組件收集。PoP是代表網絡中單個路由實體的機器,它們通過鏈接連接並分佈在形成Mode網絡的多個位置。組件使用網絡數據包探測臨近的主機,這些鄰居將回複數據包給探測。鏈路等待的時間值回覆包中得到。由於每個PoP都會有一個以上的鄰居,因此這種探測任務的本質是併發的,需要實時測量每個鄰居鏈路的延遲。為了計算此指標,無法使用順序處理,必須儘快處理每個探針。
序列號和重置
探測組件交換數據包並依靠序列號進行數據包處理。旨在避免處理分組重複或亂序分組。HALO的第一個實現依靠特殊的序列號0來重置序列號。這樣的數字僅在組件初始化期間使用。主要問題是考慮一個始終從0開始的遞增序列號值,組件重新啟動後,可能會發生數據包重新排序,並且數據包可以輕鬆地用重置之前使用的值替換序列號。這樣隨後的數據包將被忽略,直接復位之前使用的序列號。
UDP握手和有限狀態機
有一個問題是組件重新啟動後序列號是否正確一致。有幾種方法可以解決此問題,在討論了可能的選項之後,HALO選擇實現帶有清晰狀態定義的三向握手協議。該握手在初始化期間通過鏈接建立會話。這樣可以確保節點通過同一會話進行通信併為其使用適當的序列號。為了正確實現這一點,必須定義一個具有清晰狀態和過渡的有限狀態機,這樣就能夠正確管理所有握手形成的極端情況。
會話ID由握手初始化程序生成。完整的交換順序如下:
1.發送方發送一個SYN(ID)數據包。
2.接收器存儲接收到的ID併發送SYN-ACK(ID)。
3.發送方接收SYN-ACK(ID)併發出ACK(ID)。它還開始發送從序列號0開始的數據包。
4.接收器檢查最後收到的ID,如果ID匹配,則接受ACK(ID)。它還開始接受序列號為0的數據包。
處理狀態超時
基本上,在每種狀態下,最多都需要處理三種類型的事件:鏈接事件,數據包事件和超時事件。這些事件會同時顯示,因此必須正確處理併發。
鏈接事件是鏈接更新或鏈接更新。這可以啟動鏈接會話或中斷現有會話。
數據包事件是控制數據包(SYN/SYN-ACK/ACK)或只是探測響應。
超時事件是針對當前會話狀態的預定超時到期後觸發的事件。
這方面主要挑戰是如何處理併發超時到期和其他事件。這是一個容易陷入僵局和競爭狀況陷阱的地方。
第一種方法:HALO項目使用的語言是Golang。它確實提供了本機同步機制,例如本機通道和鎖,並且能夠使用輕量級線程(協程)以進行併發處理。
具體處理過程:
首先,設計一個代表會話和超時處理程序的數據結構。
type Session struct {
State SessionState
Id SessionId
RemoteIp string
}
type TimeoutHandler struct {
callback func(Session)
session Session
duration int
timer *timer.Timer
}
會話數據結構使用會話ID,相鄰鏈路IP和當前會話狀態來標識連接會話。
TimeoutHandler包含回調函數,session表示任務運行的會話,持續時間(duration)以及指向已調度計時器的timer指針。
有一個全局映射,該映射將為每個相鄰的鏈接會話存儲計劃的超時處理程序。
SessionTimeout map[Session]*TimeoutHandler
通過以下方法可以註冊和取消超時:
// schedules the timeout callback function.
func (timeout* TimeoutHandler) Register() {
timeout.timer = time.AfterFunc(time.Duration(timeout.duration) * time.Second, func() {
timeout.callback(timeout.session)
})
}
對於超時的創建和存儲,可以使用如下方法:
func CreateTimeoutHandler(callback func(Session), session Session, duration int) *TimeoutHandler {
if sessionTimeout[session] == nil {
sessionTimeout[session] := new(TimeoutHandler)
}
timeout = sessionTimeout[session]
timeout.session = session
timeout.callback = callback
timeout.duration = duration
return timeout
}
一旦創建並註冊了超時處理程序,它就會在持續時間秒數之後運行回調。但是,某些事件將要求重新安排超時處理程序(在SYN狀態下發生,即每3秒一次)。
為此,可以讓回調函數重新安排新的超時:
func synCallback(session Session) {
sendSynPacket(session)
// reschedules the same callback.
newTimeout := NewTimeoutHandler(synCallback, session, SYN_TIMEOUT_DURATION)
newTimeout.Register()
sessionTimeout[state] = newTimeout
}
該回調將在新的超時處理程序中重新安排時間,並更新全局sessionTimeout映射。
數據競爭和引用
一個簡單的測試是檢查計時器到期後是否執行了超時回調。為此,註冊一個超時,在其持續時間內休眠,然後檢查回調操作是否已完成。執行測試後,最好取消預定的超時時間,因此不會在測試之間產生副作用。令人驚訝的是,這個簡單的測試在發現瞭解決方案中的一個錯誤。使用cancel方法取消超時沒有完成其工作。以下事件順序將導致數據爭用情況:
1.有一個計劃的超時處理程序。
2.線程1:
a)收到一個控制數據包,現在要取消註冊的超時並進入下一個會話狀態。 (例如,發送了SYN後收到了SYN-ACK)。
b)調用timeout.Cancel(),它調用了timer.Stop()。(請注意,Golang計時器停止不會阻止已過期的計時器運行。)
3.線程2:
a)在該取消調用之前,計時器已到期,並且回調即將執行。
b)執行回調,它計劃新的超時並更新全局映射。
4.線程1:
a)轉換到新的會話狀態並註冊新的超時,從而更新全局映射。
兩個線程正在同時更新超時映射。最終結果是無法取消已註冊的超時,然後又丟失了對線程2完成的重新安排的超時的引用。這導致處理程序在一段時間內繼續執行和重新安排,並執行了非預期的行為。
鎖也解決不了問題
使用鎖也不能完全解決問題。如果在處理任何事件之前和執行回調之前添加了鎖,它仍然不能阻止過期的回調運行:
func (timeout* TimeoutHandler) Register() {
timeout.timer = time.AfterFunc(time.Duration(timeout.duration) * time._Second_, func() {
stateLock.Lock()
defer stateLock.Unlock()
timeout.callback(timeout.session)
})
}
和無鎖的區別是全局映射中的更新是同步的,但這不能阻止在調用超時後運行timeout.Cancel(),如果計劃的計時器已過期但未抓住鎖,則情況如此然而。
使用Cancel通道
可以使用cancel通道,而不必依賴timer.Stop()(不會阻止到期的計時器執行),
這是一個略有不同的方法。這樣可以將不再通過回調進行遞歸重新安排,而會註冊一個無限循環,等待cancel信號或超時事件。
新的Register產生一個新的go線程,該線程在超時後運行回調,並在執行前一個超時後安排新的超時。cancel通道返回給調用方,以控制循環應在何時停止。
func (timeout *TimeoutHandler) Register() chan struct{} {
cancelChan := make(chan struct{})
go func () {
select {
case _ =
return
case _ =
func () {
stateLock.Lock()
defer stateLock.Unlock()
timeout.callback(timeout.session)
} ()
}
} ()
return cancelChan
}
func (timeout* TimeoutHandler) Cancel() {
if timeout.cancelChan == nil {
return
}
timeout.cancelChan
}
這種方法為註冊的每個超時提供了一個cancel通道。取消調用將一個空結構發送到通道並觸發取消。但是,這也不能解決先前的問題;超時可能會在通過通道調用Cancel之前以及超時線程獲取鎖之前到期。
對應的解決方案是在鎖之後檢查超時範圍內的cacel通道。
case _ =
func () {
stateLock.Lock()
defer stateLock.Unlock()
select {
case _ =
return
default:
timeout.callback(timeout.session)
}
} ()
}
最後,這可以確保僅在遇到鎖之後才執行回調,並且不會觸發取消。
死鎖
此解決方案似乎有效;但是存在一個潛在的隱患——死鎖。
仔細檢查代碼,考慮併發調用的方法。問題在cancel通道本身。我們將其設置為無緩衝通道,這意味著其發送是阻塞調用。在超時處理程序中調用"取消"後,只有在該處理程序被取消後才能繼續操作。這裡的問題是,當有多個調用到同一取消通道時,取消請求僅使用一次。如果併發事件要取消相同的超時處理程序,例如鏈接斷開或控制數據包事件,則很容易發生這種情況。這將導致死鎖,可能會使應用程序停止。
應對該死鎖問題的解決方案是讓通道緩衝一下,讓發送並不總是阻塞,並且在併發調用的情況下顯式使發送變為非阻塞。這樣可以確保取消發送一次,並且不會阻止後續的取消調用。
func (timeout* TimeoutHandler) Cancel() {
if timeout.cancelChan == nil {
return
}
select {
case timeout.cancelChan
default:
// can't send on the channel, someone has already requested the cancellation.
}
}
結論
實踐中瞭解了在使用併發代碼時出現常見的常見錯誤。由於其不確定性,即使進行大量測試,也很容易發現這些問題。這是HALO在實現中遇到的三個主要問題:
在不同步的情況下更新共享數據
這似乎很明顯,但是如果同時進行的更新發生在不同的位置,則實際上很難發現。結果是數據競爭,由於一個更新會覆蓋另一個更新,對同一數據的多次更新可能導致更新丟失。
在HALO中,正在更新同一共享映射上的計劃超時參考。(有趣的是,如果Go在同一個Map對象上檢測到併發讀/寫操作,會引發致命錯誤,可以嘗試運行Go的數據競爭檢測器)。最終會導致丟失超時引用,並且無法取消給定的超時。不要是可以使用鎖。缺少條件檢查
在不能僅依靠鎖獨佔性的情況下,需要進行條件檢查。想象一個經典的場景,有一個生產者和多個消費者使用一個共享隊列。生產者可以將一項添加到隊列中,並喚醒所有消費者。喚醒調用意味著隊列中有一些數據可用,並且由於隊列是共享的,因此必須通過鎖來同步訪問。每個消費者都有機會遇到鎖;但是,仍然需要檢查隊列中是否有項目。需要進行條件檢查,因為當遇到鎖時還不知道隊列狀態。
在HALO中,超時處理程序收到了來自計時器到期的"喚醒"調用,但是它仍需要檢查是否已向其發送了取消信號,然後才能繼續執行回調。
死鎖
當一個線程被卡住,無限期地等待一個信號喚醒時,就會發生這種情況,但是這個信號永遠不會到達。
在HALO中,由於多次發送調用到一個非緩衝且阻塞的通道導致死鎖,這樣僅在同一通道上完成接收後,發送調用才會返回。超時線程循環迅速在取消通道上接收信號;但是,在接收到第一個信號後,它將中斷環路,並且再也不會從該通道讀取數據。其餘的調用將會被卡住。
為避免這種情況,需要仔細檢查代碼,謹慎處理阻塞調用,並確保不會發生線程飢餓。HALO中解決方法是使取消調用成為非阻塞調用,因為不需要阻塞調用。
閱讀更多 蟲蟲安全 的文章