從Golang實踐中得到的教訓

從Golang實踐中得到的教訓

當使用複雜的分佈式系統時,可能會遇到併發處理的需求。我們知道golang的協程是處理併發的利器之一,加上Golang為靜態類型和編譯型使得其在企業中使用越來越廣泛。Mode.net公司系統每天要處理實時,快速和靈活的以毫秒為單位動態路由數據包的全球專用網絡和數據,需要高度併發的系統,而他們的動態路由就是使用Golang來構建的,本文我們介紹Mode.net在Golang構建分佈式動態路由系統時的經驗教訓。

併發探測鏈接指標

Mode.net的路由系統稱為HALO,是Hop-by-Hop Adaptive Link-State Optimal Routing(逐跳自適應鏈路狀態最佳路由)的前綴字母簡稱。動態路由算法部分依賴於鏈路度量來計算路由表。這些指標由位於每個PoP(存活節點)上的獨立組件收集。PoP是代表網絡中單個路由實體的機器,它們通過鏈接連接並分佈在形成Mode網絡的多個位置。組件使用網絡數據包探測臨近的主機,這些鄰居將回複數據包給探測。鏈路等待的時間值回覆包中得到。由於每個PoP都會有一個以上的鄰居,因此這種探測任務的本質是併發的,需要實時測量每個鄰居鏈路的延遲。為了計算此指標,無法使用順序處理,必須儘快處理每個探針。

從Golang實踐中得到的教訓

序列號和重置

探測組件交換數據包並依靠序列號進行數據包處理。旨在避免處理分組重複或亂序分組。HALO的第一個實現依靠特殊的序列號0來重置序列號。這樣的數字僅在組件初始化期間使用。主要問題是考慮一個始終從0開始的遞增序列號值,組件重新啟動後,可能會發生數據包重新排序,並且數據包可以輕鬆地用重置之前使用的值替換序列號。這樣隨後的數據包將被忽略,直接復位之前使用的序列號。

UDP握手和有限狀態機

有一個問題是組件重新啟動後序列號是否正確一致。有幾種方法可以解決此問題,在討論了可能的選項之後,HALO選擇實現帶有清晰狀態定義的三向握手協議。該握手在初始化期間通過鏈接建立會話。這樣可以確保節點通過同一會話進行通信併為其使用適當的序列號。為了正確實現這一點,必須定義一個具有清晰狀態和過渡的有限狀態機,這樣就能夠正確管理所有握手形成的極端情況。

從Golang實踐中得到的教訓

會話ID由握手初始化程序生成。完整的交換順序如下:

1.發送方發送一個SYN(ID)數據包。

2.接收器存儲接收到的ID併發送SYN-ACK(ID)。

3.發送方接收SYN-ACK(ID)併發出ACK(ID)。它還開始發送從序列號0開始的數據包。

4.接收器檢查最後收到的ID,如果ID匹配,則接受ACK(ID)。它還開始接受序列號為0的數據包。

處理狀態超時

基本上,在每種狀態下,最多都需要處理三種類型的事件:鏈接事件,數據包事件和超時事件。這些事件會同時顯示,因此必須正確處理併發。

鏈接事件是鏈接更新或鏈接更新。這可以啟動鏈接會話或中斷現有會話。

數據包事件是控制數據包(SYN/SYN-ACK/ACK)或只是探測響應。

超時事件是針對當前會話狀態的預定超時到期後觸發的事件。

這方面主要挑戰是如何處理併發超時到期和其他事件。這是一個容易陷入僵局和競爭狀況陷阱的地方。

第一種方法:HALO項目使用的語言是Golang。它確實提供了本機同步機制,例如本機通道和鎖,並且能夠使用輕量級線程(協程)以進行併發處理。

從Golang實踐中得到的教訓


具體處理過程:

首先,設計一個代表會話和超時處理程序的數據結構。

type Session struct {

State SessionState

Id SessionId

RemoteIp string

}

type TimeoutHandler struct {

callback func(Session)

session Session

duration int

timer *timer.Timer

}

會話數據結構使用會話ID,相鄰鏈路IP和當前會話狀態來標識連接會話。

TimeoutHandler包含回調函數,session表示任務運行的會話,持續時間(duration)以及指向已調度計時器的timer指針。

有一個全局映射,該映射將為每個相鄰的鏈接會話存儲計劃的超時處理程序。

SessionTimeout map[Session]*TimeoutHandler

通過以下方法可以註冊和取消超時:

// schedules the timeout callback function.

func (timeout* TimeoutHandler) Register() {

timeout.timer = time.AfterFunc(time.Duration(timeout.duration) * time.Second, func() {

timeout.callback(timeout.session)

})

}

對於超時的創建和存儲,可以使用如下方法:

func CreateTimeoutHandler(callback func(Session), session Session, duration int) *TimeoutHandler {

if sessionTimeout[session] == nil {

sessionTimeout[session] := new(TimeoutHandler)

}


timeout = sessionTimeout[session]

timeout.session = session

timeout.callback = callback

timeout.duration = duration

return timeout

}

一旦創建並註冊了超時處理程序,它就會在持續時間秒數之後運行回調。但是,某些事件將要求重新安排超時處理程序(在SYN狀態下發生,即每3秒一次)。

為此,可以讓回調函數重新安排新的超時:

func synCallback(session Session) {

sendSynPacket(session)

// reschedules the same callback.

newTimeout := NewTimeoutHandler(synCallback, session, SYN_TIMEOUT_DURATION)

newTimeout.Register()

sessionTimeout[state] = newTimeout

}

該回調將在新的超時處理程序中重新安排時間,並更新全局sessionTimeout映射。

數據競爭和引用

一個簡單的測試是檢查計時器到期後是否執行了超時回調。為此,註冊一個超時,在其持續時間內休眠,然後檢查回調操作是否已完成。執行測試後,最好取消預定的超時時間,因此不會在測試之間產生副作用。令人驚訝的是,這個簡單的測試在發現瞭解決方案中的一個錯誤。使用cancel方法取消超時沒有完成其工作。以下事件順序將導致數據爭用情況:

1.有一個計劃的超時處理程序。

2.線程1:

a)收到一個控制數據包,現在要取消註冊的超時並進入下一個會話狀態。 (例如,發送了SYN後收到了SYN-ACK)。

b)調用timeout.Cancel(),它調用了timer.Stop()。(請注意,Golang計時器停止不會阻止已過期的計時器運行。)

3.線程2:

a)在該取消調用之前,計時器已到期,並且回調即將執行。

b)執行回調,它計劃新的超時並更新全局映射。

4.線程1:

a)轉換到新的會話狀態並註冊新的超時,從而更新全局映射。

兩個線程正在同時更新超時映射。最終結果是無法取消已註冊的超時,然後又丟失了對線程2完成的重新安排的超時的引用。這導致處理程序在一段時間內繼續執行和重新安排,並執行了非預期的行為。

鎖也解決不了問題

使用鎖也不能完全解決問題。如果在處理任何事件之前和執行回調之前添加了鎖,它仍然不能阻止過期的回調運行:

func (timeout* TimeoutHandler) Register() {

timeout.timer = time.AfterFunc(time.Duration(timeout.duration) * time._Second_, func() {

stateLock.Lock()

defer stateLock.Unlock()

timeout.callback(timeout.session)

})

}

和無鎖的區別是全局映射中的更新是同步的,但這不能阻止在調用超時後運行timeout.Cancel(),如果計劃的計時器已過期但未抓住鎖,則情況如此然而。

使用Cancel通道

可以使用cancel通道,而不必依賴timer.Stop()(不會阻止到期的計時器執行),

這是一個略有不同的方法。這樣可以將不再通過回調進行遞歸重新安排,而會註冊一個無限循環,等待cancel信號或超時事件。

新的Register產生一個新的go線程,該線程在超時後運行回調,並在執行前一個超時後安排新的超時。cancel通道返回給調用方,以控制循環應在何時停止。

func (timeout *TimeoutHandler) Register() chan struct{} {

cancelChan := make(chan struct{})

go func () {

select {

case _ =

return

case _ =

func () {

stateLock.Lock()

defer stateLock.Unlock()

timeout.callback(timeout.session)

} ()

}

} ()

return cancelChan

}


func (timeout* TimeoutHandler) Cancel() {

if timeout.cancelChan == nil {

return

}

timeout.cancelChan

}

這種方法為註冊的每個超時提供了一個cancel通道。取消調用將一個空結構發送到通道並觸發取消。但是,這也不能解決先前的問題;超時可能會在通過通道調用Cancel之前以及超時線程獲取鎖之前到期。

對應的解決方案是在鎖之後檢查超時範圍內的cacel通道。

case _ =

func () {

stateLock.Lock()

defer stateLock.Unlock()

select {

case _ =

return

default:

timeout.callback(timeout.session)

}

} ()

}

最後,這可以確保僅在遇到鎖之後才執行回調,並且不會觸發取消。

死鎖

此解決方案似乎有效;但是存在一個潛在的隱患——死鎖。

仔細檢查代碼,考慮併發調用的方法。問題在cancel通道本身。我們將其設置為無緩衝通道,這意味著其發送是阻塞調用。在超時處理程序中調用"取消"後,只有在該處理程序被取消後才能繼續操作。這裡的問題是,當有多個調用到同一取消通道時,取消請求僅使用一次。如果併發事件要取消相同的超時處理程序,例如鏈接斷開或控制數據包事件,則很容易發生這種情況。這將導致死鎖,可能會使應用程序停止。

從Golang實踐中得到的教訓

應對該死鎖問題的解決方案是讓通道緩衝一下,讓發送並不總是阻塞,並且在併發調用的情況下顯式使發送變為非阻塞。這樣可以確保取消發送一次,並且不會阻止後續的取消調用。

func (timeout* TimeoutHandler) Cancel() {

if timeout.cancelChan == nil {

return

}


select {

case timeout.cancelChan

default:

// can't send on the channel, someone has already requested the cancellation.

}

}

結論

實踐中瞭解了在使用併發代碼時出現常見的常見錯誤。由於其不確定性,即使進行大量測試,也很容易發現這些問題。這是HALO在實現中遇到的三個主要問題:

在不同步的情況下更新共享數據

這似乎很明顯,但是如果同時進行的更新發生在不同的位置,則實際上很難發現。結果是數據競爭,由於一個更新會覆蓋另一個更新,對同一數據的多次更新可能導致更新丟失。

在HALO中,正在更新同一共享映射上的計劃超時參考。(有趣的是,如果Go在同一個Map對象上檢測到併發讀/寫操作,會引發致命錯誤,可以嘗試運行Go的數據競爭檢測器)。最終會導致丟失超時引用,並且無法取消給定的超時。不要是可以使用鎖。

從Golang實踐中得到的教訓


缺少條件檢查

在不能僅依靠鎖獨佔性的情況下,需要進行條件檢查。想象一個經典的場景,有一個生產者和多個消費者使用一個共享隊列。生產者可以將一項添加到隊列中,並喚醒所有消費者。喚醒調用意味著隊列中有一些數據可用,並且由於隊列是共享的,因此必須通過鎖來同步訪問。每個消費者都有機會遇到鎖;但是,仍然需要檢查隊列中是否有項目。需要進行條件檢查,因為當遇到鎖時還不知道隊列狀態。

在HALO中,超時處理程序收到了來自計時器到期的"喚醒"調用,但是它仍需要檢查是否已向其發送了取消信號,然後才能繼續執行回調。

從Golang實踐中得到的教訓

死鎖

當一個線程被卡住,無限期地等待一個信號喚醒時,就會發生這種情況,但是這個信號永遠不會到達。

在HALO中,由於多次發送調用到一個非緩衝且阻塞的通道導致死鎖,這樣僅在同一通道上完成接收後,發送調用才會返回。超時線程循環迅速在取消通道上接收信號;但是,在接收到第一個信號後,它將中斷環路,並且再也不會從該通道讀取數據。其餘的調用將會被卡住。

為避免這種情況,需要仔細檢查代碼,謹慎處理阻塞調用,並確保不會發生線程飢餓。HALO中解決方法是使取消調用成為非阻塞調用,因為不需要阻塞調用。


分享到:


相關文章: