原創: ayuliao 源:hackPython
簡介
Flask 是 Python 中有名的輕量級同步 web 框架,在一些開發中,可能會遇到需要長時間處理的任務,此時就需要使用異步的方式來實現,讓長時間任務在後臺運行,先將本次請求的響應狀態返回給前端,不讓前端界面「卡頓」,當異步任務處理好後,如果需要返回狀態,再將狀態返回。
怎麼實現呢?
使用線程的方式
當要執行耗時任務時,直接開啟一個新的線程來執行任務,這種方式最為簡單快速。
通過 ThreadPoolExecutor 來實現
from
flask
import
Flask
from
time
import
sleep
from
concurrent.futures
import
ThreadPoolExecutor
# DOCS https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.ThreadPoolExecutor
# 創建線程池執行器
executor =
ThreadPoolExecutor
(
2
)
app =
Flask
(__name__)
@app
.route(
'/jobs'
)
def
run_jobs():
# 交由線程去執行耗時任務
executor.submit(long_task,
'hello'
,
123
)
return
'long task running.'
# 耗時任務
def
long_task(arg1, arg2):
(
"args: %s %s!"
% (arg1, arg2))
sleep(
5
)
(
"Task is done!"
)
if
__name__ ==
'__main__'
:
app.run()
當要執行一些比較簡單的耗時任務時就可以使用這種方式,如發郵件、發短信驗證碼等。
但這種方式有個問題,就是前端無法得知任務執行狀態。
如果想要前端知道,就需要設計一些邏輯,比如將任務執行狀態存儲到 redis 中,通過唯一的任務 id 進行標識,然後再寫一個接口,通過任務 id 去獲取任務的狀態,然後讓前端定時去請求該接口,從而獲得任務狀態信息。
全部自己實現就顯得有些麻煩了,而 Celery 剛好實現了這樣的邏輯,來使用一下。
使用 Celery
為了滿足前端可以獲得任務狀態的需求,可以使用 Celery。
Celery 是實時任務處理與調度的分佈式任務隊列,它常用於 web 異步任務、定時任務等,後面單獨寫一篇文章描述 Celery 的架構,這裡不深入討論。
現在我想讓前端可以通過一個進度條來判斷後端任務的執行情況。使用 Celery 就很容易實現,首先通過 pip 安裝 Celery 與 redis,之所以要安裝 redis,是因為讓 Celery 選擇 redis 作為「消息代理 / 消息中間件」。
pip install celery
pip install redis
在 Flask 中使用 Celery 其實很簡單,這裡先簡單的過一下 Flask 中使用 Celery 的整體流程,然後再去實現具體的項目
1. 在 Flask 中初始化 Celeryfrom
flask
import
Flask
from
celery
import
Celery
app =
Flask
(__name__)
# 配置
# 配置消息代理的路徑,如果是在遠程服務器上,則配置遠程服務器中redis的URL
app.config[
'CELERY_BROKER_URL'
] =
'redis://localhost:6379/0'
# 要存儲 Celery 任務的狀態或運行結果時就必須要配置
app.config[
'CELERY_RESULT_BACKEND'
] =
'redis://localhost:6379/0'
# 初始化Celery
celery =
Celery
(app.name, broker=app.config[
'CELERY_BROKER_URL'
])
# 將Flask中的配置直接傳遞給Celery
celery.conf.update(app.config)
上述代碼中,通過 Celery 類初始化 celery 對象,傳入的應用名稱與消息代理的連接 URL。
2. 通過 celery.task 裝飾器裝飾耗時任務對應的函數@celery
.task
def
long_task(arg1, arg2):
# 耗時任務的邏輯
return
result
3.Flask 中定義接口通過異步的方式執行耗時任務
@app
.route(
'/'
, methods=[
'GET'
,
'POST'
])
def
index():
task = long_task.delay(
1
,
2
)
delay () 方法是 applyasync () 方法的快捷方式,applyasync () 參數更多,可以更加細緻的控制耗時任務,比如想要 long_task () 在一分鐘後再執行
@app
.route(
'/'
, methods=[
'GET'
,
'POST'
])
def
index():
task = long_task.apply_async(args=[
1
,
2
], countdown=
60
)
delay () 與 apply_async () 會返回一個任務對象,該對象可以獲取任務的狀態與各種相關信息。
通過這 3 步就可以使用 Celery 了。
接著就具體來實現「讓前端可以通過一個進度條來判斷後端任務的執行情況」的需求。
# bind為True,會傳入self給被裝飾的方法
@celery
.task(bind=
True
)
def
long_task(self):
verb = [
'Starting up'
,
'Booting'
,
'Repairing'
,
'Loading'
,
'Checking'
]
adjective = [
'master'
,
'radiant'
,
'silent'
,
'harmonic'
,
'fast'
]
noun = [
'solar array'
,
'particle reshaper'
,
'cosmic ray'
,
'orbiter'
,
'bit'
]
message =
''
total = random.randint(
10
,
50
)
for
i
in
range(total):
if
not
message
or
random.random() <
0.25
:
# 隨機的獲取一些信息
message =
'{0} {1} {2}...'
.format(random.choice(verb),
random.choice(adjective),
random.choice(noun))
# 更新Celery任務狀態
self.update_state(state=
'PROGRESS'
,
meta={
'current'
: i,
'total'
: total,
'status'
: message})
time.sleep(
1
)
# 返回字典
return
{
'current'
:
100
,
'total'
:
100
,
'status'
:
'Task completed!'
,
'result'
:
42
}
上述代碼中,celery.task () 裝飾器使用了 bind=True 參數,這個參數會讓 Celery 將 Celery 本身傳入,可以用於記錄與更新任務狀態。
然後就是一個 for 迭代,迭代的邏輯沒什麼意義,就是隨機從 list 中抽取一些詞彙來模擬一些邏輯的運行,為了表示這是耗時邏輯,通過 time.sleep (1) 休眠一秒。
每次獲取一次詞彙,就通過 self.update_state () 更新 Celery 任務的狀態,Celery 包含一些內置狀態,如 SUCCESS、STARTED 等等,這裡使用了自定義狀態「PROGRESS」,除了狀態外,還將本次循環的一些信息通過 meta 參數 (元數據) 以字典的形式存儲起來。有了這些數據,前端就可以顯示進度條了。
定義好耗時方法後,再定義一個 Flask 接口方法來調用該耗時方法
@app
.route(
'/longtask'
, methods=[
'POST'
])
def
longtask():
# 異步調用
task = long_task.apply_async()
# 返回 202,與Location頭
return
jsonify({}),
202
, {
'Location'
: url_for(
'taskstatus'
,
task_id=task.id)}
簡單而言,前端通過 POST 請求到 /longtask,讓後端開始去執行耗時任務。
返回的狀態碼為 202,202 通常表示一個請求正在進行中,然後還在返回數據包的包頭 (Header) 中添加了 Location 頭信息,前端可以通過讀取數據包中 Header 中的 Location 的信息來獲取任務 id 對應的完整 url。
前端有了任務 id 對應的 url 後,還需要提供一個接口給前端,讓前端可以通過任務 id 去獲取當前時刻任務的具體狀態。
@app
.route(
'/status/<task>'
)
def
taskstatus(task_id):
task = long_task.
AsyncResult
(task_id)
if
task.state ==
'PENDING'
:
# 在等待
response = {
'state'
: task.state,
'current'
:
0
,
'total'
:
1
,
'status'
:
'Pending...'
}
elif
task.state !=
'FAILURE'
:
# 沒有失敗
response = {
'state'
: task.state,
# 狀態
# meta中的數據,通過task.info.get()可以獲得
'current'
: task.info.get(
'current'
,
0
),
# 當前循環進度
'total'
: task.info.get(
'total'
,
1
),
# 總循環進度
'status'
: task.info.get(
'status'
,
''
)
}
if
'result'
in
task.info:
response[
'result'
] = task.info[
'result'
]
else
:
# 後端執行任務出現了一些問題
response = {
'state'
: task.state,
'current'
:
1
,
'total'
:
1
,
'status'
: str(task.info),
# 報錯的具體異常
}
return
jsonify(response)
/<task>
為了可以獲得任務對象中的信息,使用任務 id 初始化 AsyncResult 類,獲得任務對象,然後就可以從任務對象中獲得當前任務的信息。
該方法會返回一個 JSON,其中包含了任務狀態以及 meta 中指定的信息,前端可以利用這些信息構建一個進度條。
如果任務在 PENDING 狀態,表示該任務還沒有開始,在這種狀態下,任務中是沒有什麼信息的,這裡人為的返回一些數據。如果任務執行失敗,就返回 task.info 中包含的異常信息,此外就是正常執行了,正常執行可以通 task.info 獲得任務中具體的信息。
這樣,後端的邏輯就處理完成了,接著就來實現前端的邏輯,要實現圖形進度條,可以直接使用 nanobar.js,簡單兩句話就可以實現一個進度條,其官網例子如下:
var
options = {
classname:
'my-class'
,
id:
'my-id'
,
// 進度條要出現的位置
target: document.getElementById(
'myDivId'
)
};
// 初始化進度條對象
var
nanobar =
new
Nanobar
( options );
nanobar.go(
30
);
// 30% 進度條
nanobar.go(
76
);
// 76% 進度條
// 100% 進度條,進度條結束
nanobar.go(
100
);
有了 nanobar.js 就非常簡單了。
先定義一個簡單的 HTML 界面
Long running task with progress updates
<button>
id
=
"start-bg-job"
>
Start Long Calculation
/<button>
id
=
"progress"
>
通過 JavaScript 實現對後臺的請求
// 按鈕點擊事件
$(
function
() {
$(
'#start-bg-job'
).click(start_long_task);
});
// 請求 longtask 接口
function
start_long_task() {
// 添加元素在html中
div = $(
'
);
$(
'#progress'
).append(div);
// 創建進度條對象
var
nanobar =
new
Nanobar
({
bg:
'#44f'
,
target: div[
0
].childNodes[
0
]
});
// ajax請求longtask
$.ajax({
type:
'POST'
,
url:
'/longtask'
,
// 獲得數據,從響應頭中獲取Location
success:
function
(data, status, request) {
status_url = request.getResponseHeader(
'Location'
);
// 調用 update_progress() 方法更新進度條
update_progress(status_url, nanobar, div[
0
]);
},
error:
function
() {
alert(
'Unexpected error'
);
}
});
}
// 更新進度條
function
update_progress(status_url, nanobar, status_div) {
// getJSON()方法是JQuery內置方法,這裡向Location中對應的url發起請求,即請求「/status/<task>」
$.getJSON(status_url,
function
(data) {
// 計算進度
percent = parseInt(data[
'current'
] *
100
/ data[
'total'
]);
// 更新進度條
nanobar.go(percent);
// 更新文字
$(status_div.childNodes[
1
]).text(percent +
'%'
);
$(status_div.childNodes[
2
]).text(data[
'status'
]);
if
(data[
'state'
] !=
'PENDING'
&& data[
'state'
] !=
'PROGRESS'
) {
if
(
'result'
in data) {
// 展示結果
$(status_div.childNodes[
3
]).text(
'Result: '
+ data[
'result'
]);
}
else
{
// 意料之外的事情發生
$(status_div.childNodes[
3
]).text(
'Result: '
+ data[
'state'
]);
}
}
else
{
// 2秒後再次運行
setTimeout(
function
() {
update_progress(status_url, nanobar, status_div);
},
2000
);
}
});
}
/<task>
可以通過註釋閱讀代碼整體邏輯。
至此,需求實現完了,運行一下。
首先運行 Redis
redis-server
然後運行 celery
celery worker -A app.celery --loglevel=info
最後運行 Flask 項目
python app.py
效果如下:
最後,我自己是一名從事了多年開發的Python老程序員,辭職目前在做自己的Python私人定製課程,今年年初我花了一個月整理了一份最適合2019年學習的Python學習乾貨,可以送給每一位喜歡Python的小夥伴,想要獲取的可以關注我的頭條號並在後臺私信我:01,即可免費獲取。