Python學習教程:Python 使用 backoff 更優雅的實現輪詢

我們經常在開發中會遇到這樣一種場景,即輪循操作。今天介紹一個Python庫,用於更方便的達到輪循的效果——backoff。

Python學習教程:Python 使用 backoff 更優雅的實現輪詢

backoff 模塊簡介及安裝

這個模塊主要提供了是一個裝飾器,用於裝飾函數,使得它在遇到某些條件時會重試(即反覆執行被裝飾的函數)。通常適用於我們在獲取一些不可靠資源,比如會間歇性故障的資源等。

此外,裝飾器支持正常的同步方法,也支持異步asyncio代碼。

backoff 模塊的安裝也很簡單,通過 pip 即可安裝完成:

pip install backoff
Python學習教程:Python 使用 backoff 更優雅的實現輪詢

backoff 用法及簡單源碼分析

backoff 提供兩個主要的裝飾器,通過 backoff. 調用,通過提示我們可以看到這兩個裝飾器,分別是:

backoff.on_predicate
backoff.on_exception

通過 github 查看 backoff 的源碼,源碼目錄 backoff/_decorator.py,定義如下:

def on_predicate(wait_gen,
predicate=operator.not_,
max_tries=None,
max_time=None,
jitter=full_jitter,
on_success=None,
on_backoff=None,
on_giveup=None,
logger='backoff',
**wait_gen_kwargs):
# 省略具體代碼
# 每個參數的定義在源碼中都給出了明確的解釋
pass
def on_exception(wait_gen,
exception,
max_tries=None,
max_time=None,
jitter=full_jitter,
giveup=lambda e: False,
on_success=None,
on_backoff=None,
on_giveup=None,
logger='backoff',
**wait_gen_kwargs):
# 省略具體代碼
# 每個參數的定義在源碼中都給出了明確的解釋

pass

可以看到,定義了很多的參數,這些參數在源碼中都給出了比較詳細的解釋,這裡做簡單的介紹:

首先,wait_gen:表示每次循環等待的時長,以秒為單位。它的類型是一個生成器,在 backoff 中內置了三個生成器。我們查看下源碼,目錄為 backoff/_wait_gen.py。我們取其中一個的詳細實現來看下:

# 省略實現代碼
# base * factor * n
def expo(base=2, factor=1, max_value=None):
"""Generator for exponential decay.
Args:
base: The mathematical base of the exponentiation operation
factor: Factor to multiply the exponentation by.
max_value: The maximum value to yield. Once the value in the
true exponential sequence exceeds this, the value
of max_value will forever after be yielded.
"""
n = 0
while True:
a = factor * base ** n
if max_value is None or a < max_value:
yield a
n += 1
else:
yield max_value
# 通過斐波那契數列控制
def fibo(max_value=None):
pass
# 常量數值
def constant(interval=1):
pass

從源碼不難看出,通過一些策略,每次 yield 返回不同的數值,這些數值就是重試等待秒數。當然因為這個參數類型是生成器,顯然我們也是可以自定義的。同時我們會發現每個 wait_gen 都是參數控制的,所以我們理應是可以修改這個參數的初始值的。

顯然,wait_gen_kwargs就是用來傳遞這些參數的,它是通過可變關鍵字參數控制的,可以直接用 key=value 的形式進行傳參,簡單示例如下:

@backoff.on_predicate(backoff.constant, interval=5)
def main3():
print("time is {} retry...".format(time.time()))

predict 與 exception。這兩個相對比較簡單,predict 接受一個函數,當這個函數返回 True 時會進行重試,否則停止,同時這個函數接受一個參數,這個參數的值是被裝飾函數的返回值。這個參數的默認值是:operator._not。這個函數的源碼如下:

def not_(a):
"Same as not a."
return not a

所以默認返回的是 not 被裝飾函數的返回值。如果當被裝飾函數並沒有返回值時,返回 True,會進行重試。

示例代碼如下:

import backoff
import time
@backoff.on_predicate(backoff.fibo)
def test2():
print("time is {}, retry...".format(time.time()))
if __name__ == "__main__":
test2()
# 等價於:
# 必須接受一個參數,這個參數的值是被裝飾函數的返回值
def condition(r):

return True

@backoff.on_predicate(backoff.fibo, condition)
def test2():
print("time is {}, retry...".format(time.time()))
if __name__ == "__main__":
test2()

執行結果如下:

$ python3 backoff_test.py
time is 1571801845.834578, retry...
time is 1571801846.121314, retry...
time is 1571801846.229812, retry...
time is 1571801846.533237, retry...
time is 1571801849.460303, retry...
time is 1571801850.8974788, retry...
time is 1571801856.498335, retry...
time is 1571801861.56931, retry...
time is 1571801872.701226, retry...
time is 1571801879.198495, retry...
...

需要注意幾點:

  • 如果自定義這個參數對應的函數,這個函數是需要接受一個參數的,這個參數的值是被裝飾函數的返回值。我們可以通過控制這個返回值來做一些條件判斷,當達到某些特殊條件時重試結束。
  • 示例中 wait_gen 用的是 backoff.fibo,注意觀察輸出的時間單隔,這裡的時間間隔好像並不像我們想象中按 fibo 返回的時間間隔數,實際上如果想達到這個效果,我們需要將 jitter 參數設置為 None,後面介紹 jitter 參數時再做說明。

而 exception 則是接受異常類型的實例,可以是單個異常,也可以是元組形式的多個異常。簡單示例如下:

import time
import random
import backoff
from collections import deque
class MyException(Exception):
def __init__(self, message, status):
super().__init__(message, status)
self.message = message
self.status = status
class MyException2(Exception):
pass
@backoff.on_exception(backoff.expo, (MyException, MyException2))
def main():
random_num = random.randint(0, 9)
print("retry...and random num is {}".format(random_num))
if random_num % 2 == 0:
raise MyException("my exception", int("1000" + str(random_num)))
raise MyException2()

max_tries 與 max_time 也比較簡單,分別代表最大重試次數與最長重試時間。這裡就不做演示了。

@backoff.on_exception 中的 giveup,它接受一個異常實例,通過對這個實例做一些條件判斷,達到判斷是否需要繼續循環的目的。如果返回 True,則結束,反之繼續。默認值一直是返回 False,即會一直循環。示例如下:

import random
import backoff
class MyException(Exception):
def __init__(self, message, status):
super().__init__(message, status)
self.message = message
self.status = status
def exception_status(e):

print('exception status code is {}'.format(e.status))
return e.status % 2 == 0

@backoff.on_exception(backoff.expo, MyException, giveup=exception_status)
def main():
random_num = random.randint(0, 9)
print("retry...and random num is {}".format(random_num))
raise MyException("my exception", int("1000" + str(random_num)))
if __name__ == "__main__":
main()

運行結果:

retry...and random num is 5
exception status code is 10005
retry...and random num is 0
exception status code is 10000
# 會再走一遍 raise 的代碼,所以異常仍然會拋出來
Traceback (most recent call last):
File "backoff_test.py", line 36, in <module>
main()
File "/Users/ruoru/code/python/exercise/.venv/lib/python3.7/site-packages/backoff/_sync.py", line 94, in retry
ret = target(*args, **kwargs)
File "backoff_test.py", line 32, in main
raise MyException("my exception", int("1000" + str(random_num)))
__main__.MyException: ('my exception', 10000)
/<module>

需要注意兩點:

  • 這個參數接受的函數仍然只有一個參數,這個參數的值是一個異常實例對象
  • 從結果我們可以看出,當拋出異常時,會先進入 giveup 接受的函數,如果函數判斷需要 giveup 時,當前的異常仍然會拋出。所以有需要,代碼仍然需要做異常邏輯處理。

on_success、on_backoff 與 on_giveup 這三個是一類的參數,用於做事件處理:

  • on_sucess 事件會比較難理解一點,它表示的是被裝飾函數成功結束輪循則會退出,對於 on_exception 來說即當被裝飾函數沒有發生異常時則會調用 on_success。而對於 on_predicate 來說即是通過 predicate 關鍵字返回為 False 結束循環則會調用。
  • on_backoff 即當程序產生循環時會調用
  • on_giveup 當程序是達到當前可嘗試最大次數後,會調用。對於 on_predicate 如果是通過 max_tries 或者 max_time 會調用,而對於 on_exception ,對於 exception 參數返回 True 時也會調用 on_giveup

總結來說,max_tries 和 max_time 這種直接控制結束的,調用的是 on_giveup,而 exception 參數也是通過返回 True 則程序就結束,它是用來控制程序結束的,所以也會調用 on_giveup。而 predicate 參數返回 True 則程序繼續,它是用來控制程序是否繼續徨的,所以當它結束時,調用的是 on_success。

實驗代碼如下:

'''
@Author: ruoru
@Date: 2019-10-22 15:30:32
@LastEditors: ruoru
@LastEditTime: 2019-10-23 14:37:13
@Description: backoff
'''
import time
import random
import backoff

class MyException(Exception):
def __init__(self, status, message):
super().__init__(status, message)
self.status = status
self.message = message
def backoff_hdlr(details):
print("Backing off {wait:0.1f} seconds afters {tries} tries "
"calling function {target} with args {args} and kwargs "
"{kwargs}".format(**details))
def success_hdlr(details):
print("Success offafters {tries} tries "
"calling function {target} with args {args} and kwargs "
"{kwargs}".format(**details))
def giveup_hdlr(details):
print("Giveup off {tries} tries "
"calling function {target} with args {args} and kwargs "
"{kwargs}".format(**details))
@backoff.on_predicate(
backoff.constant,
# 當 random num 不等 10009 則繼續
# 當 random_num 等於 10009 後,會調用 on_success
lambda x: x != 10009,
on_success=success_hdlr,
on_backoff=backoff_hdlr,
on_giveup=giveup_hdlr,
max_time=2)
def main():
num = random.randint(10000, 10010)
print("time is {}, num is {}, retry...".format(time.time(), num))
return num
@backoff.on_exception(
backoff.constant,
MyException,
# 當 Exception 實例對象的 status 為 10009 成立時退出
# 當條件成立時,調用的是 on_giveup
giveup=lambda e: e.status == 10009,
on_success=success_hdlr,
on_backoff=backoff_hdlr,
on_giveup=giveup_hdlr,
)
def main2():
num = random.randint(10000, 10010)
print("time is {}, num is {}, retry...".format(time.time(), num))
# 如果是通過這個條件成立退出,調用的是 on_success
if num == 10010:

return
raise MyException(num, "hhh")
if __name__ == "__main__":
#main()
main2()

logger 參數,很顯然就是用來控制日誌輸出的,這裡不做詳細介紹。copy 官方文檔的一個示例:

my_logger = logging.getLogger('my_logger')
my_handler = logging.StreamHandler()
my_logger.add_handler(my_handler)
my_logger.setLevel(logging.ERROR)
@backoff.on_exception(backoff.expo,
requests.exception.RequestException,
logger=my_logger)
# ...

最後一個參數,jitter,開始也不是很明白這個參數的作用,文檔的解釋如下:

jitter: A function of the value yielded by wait_gen returning the actual time to wait. This distributes wait times stochastically in order to avoid timing collisions across concurrent clients. Wait times are jittered by default using the full_jitter function. Jittering may be disabled altogether by passing jitter=None.

有點暈,於是去看了下源碼,明白了用法,截取關鍵源碼如下:

# backoff/_decorator.py
def on_predicate(wait_gen,
predicate=operator.not_,
max_tries=None,
max_time=None,
jitter=full_jitter,
on_success=None,
on_backoff=None,
on_giveup=None,
logger='backoff',
**wait_gen_kwargs):
pass # 省略
# 因為沒有用到異步,所以會進到這裡
if retry is None:

retry = _sync.retry_predicate
# backoff/_sync
# 分析可以看到有一句獲取下次 wait 時長
seconds = _next_wait(wait, jitter, elapsed, max_time_)
# backoff/_common
def _next_wait(wait, jitter, elapsed, max_time):
value = next(wait)
try:
if jitter is not None:
seconds = jitter(value)
else:
seconds = value
except TypeError:
warnings.warn(
"Nullary jitter function signature is deprecated. Use "
"unary signature accepting a wait value in seconds and "
"returning a jittered version of it.",
DeprecationWarning,
stacklevel=2,
)
seconds = value + jitter()
# don't sleep longer than remaining alloted max_time
if max_time is not None:
seconds = min(seconds, max_time - elapsed)
return seconds

看前面幾行代碼應該就會比較清晰了,如果 jitter 為 None,則會使用第一個參數返回的 value 值,而如果使用了,則會在這個 value 值上再做一次算法,默認為 full_jitter(value)。backoff/_jitter.py 提供了兩個算法,代碼不長,貼上來看看:

import random
def random_jitter(value):
"""Jitter the value a random number of milliseconds.
This adds up to 1 second of additional time to the original value.
Prior to backoff version 1.2 this was the default jitter behavior.
Args:
value: The unadulterated backoff value.
"""
return value + random.random()
def full_jitter(value):
"""Jitter the value across the full range (0 to value).
This corresponds to the "Full Jitter" algorithm specified in the

AWS blog's post on the performance of various jitter algorithms.
(http://www.awsarchitectureblog.com/2015/03/backoff.html)
Args:
value: The unadulterated backoff value.
"""
return random.uniform(0, value)

到這裡,backoff 的基本用法也就結束了。當然它也支持異步的方法裝飾,用法差不多,這裡不再深入。

更多的Python學習教程也會繼續為大家更新!


分享到:


相關文章: