09.18 Python Web:Flask 异步执行任务

原创: ayuliao 源:hackPython


Python Web:Flask 异步执行任务


简介

Flask 是 Python 中有名的轻量级同步 web 框架,在一些开发中,可能会遇到需要长时间处理的任务,此时就需要使用异步的方式来实现,让长时间任务在后台运行,先将本次请求的响应状态返回给前端,不让前端界面「卡顿」,当异步任务处理好后,如果需要返回状态,再将状态返回。

怎么实现呢?

使用线程的方式

当要执行耗时任务时,直接开启一个新的线程来执行任务,这种方式最为简单快速。

通过 ThreadPoolExecutor 来实现

from
flask
import

Flask
from
time
import
sleep
from
concurrent.futures
import


ThreadPoolExecutor
# DOCS https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.ThreadPoolExecutor
# 创建线程池执行器
executor =
ThreadPoolExecutor
(
2
)
app =
Flask
(__name__)
@app
.route(
'/jobs'
)
def
run_jobs():

# 交由线程去执行耗时任务
executor.submit(long_task,
'hello'
,
123
)

return

'long task running.'
# 耗时任务
def
long_task(arg1, arg2):

print
(
"args: %s %s!"
% (arg1, arg2))
sleep(
5
)

print
(
"Task is done!"
)
if
__name__ ==
'__main__'

:
app.run()

当要执行一些比较简单的耗时任务时就可以使用这种方式,如发邮件、发短信验证码等。

但这种方式有个问题,就是前端无法得知任务执行状态。

如果想要前端知道,就需要设计一些逻辑,比如将任务执行状态存储到 redis 中,通过唯一的任务 id 进行标识,然后再写一个接口,通过任务 id 去获取任务的状态,然后让前端定时去请求该接口,从而获得任务状态信息。

全部自己实现就显得有些麻烦了,而 Celery 刚好实现了这样的逻辑,来使用一下。

使用 Celery

为了满足前端可以获得任务状态的需求,可以使用 Celery。

Celery 是实时任务处理与调度的分布式任务队列,它常用于 web 异步任务、定时任务等,后面单独写一篇文章描述 Celery 的架构,这里不深入讨论。

现在我想让前端可以通过一个进度条来判断后端任务的执行情况。使用 Celery 就很容易实现,首先通过 pip 安装 Celery 与 redis,之所以要安装 redis,是因为让 Celery 选择 redis 作为「消息代理 / 消息中间件」。

pip install celery
pip install redis

在 Flask 中使用 Celery 其实很简单,这里先简单的过一下 Flask 中使用 Celery 的整体流程,然后再去实现具体的项目

  • 1. 在 Flask 中初始化 Celery
from
flask
import

Flask
from
celery
import

Celery
app =
Flask
(__name__)
# 配置
# 配置消息代理的路径,如果是在远程服务器上,则配置远程服务器中redis的URL
app.config[
'CELERY_BROKER_URL'
] =
'redis://localhost:6379/0'
# 要存储 Celery 任务的状态或运行结果时就必须要配置
app.config[
'CELERY_RESULT_BACKEND'
] =
'redis://localhost:6379/0'
# 初始化Celery
celery =
Celery
(app.name, broker=app.config[
'CELERY_BROKER_URL'

])
# 将Flask中的配置直接传递给Celery
celery.conf.update(app.config)

上述代码中,通过 Celery 类初始化 celery 对象,传入的应用名称与消息代理的连接 URL。

  • 2. 通过 celery.task 装饰器装饰耗时任务对应的函数
@celery
.task
def
long_task(arg1, arg2):

# 耗时任务的逻辑

return
result
  • 3.Flask 中定义接口通过异步的方式执行耗时任务
@app
.route(
'/'
, methods=[
'GET'
,
'POST'
])
def
index():
task = long_task.delay(
1
,
2
)

delay () 方法是 applyasync () 方法的快捷方式,applyasync () 参数更多,可以更加细致的控制耗时任务,比如想要 long_task () 在一分钟后再执行

@app
.route(
'/'
, methods=[
'GET'
,
'POST'
])
def
index():
task = long_task.apply_async(args=[
1
,
2
], countdown=
60
)

delay () 与 apply_async () 会返回一个任务对象,该对象可以获取任务的状态与各种相关信息。

通过这 3 步就可以使用 Celery 了。

接着就具体来实现「让前端可以通过一个进度条来判断后端任务的执行情况」的需求。

# bind为True,会传入self给被装饰的方法
@celery
.task(bind=
True
)
def
long_task(self):
verb = [
'Starting up'

,
'Booting'
,
'Repairing'
,
'Loading'
,
'Checking'
]
adjective = [
'master'
,
'radiant'
,
'silent'
,
'harmonic'
,
'fast'
]
noun = [
'solar array'
,
'particle reshaper'
,
'cosmic ray'
,
'orbiter'
,
'bit'
]
message =
''
total = random.randint(
10
,
50
)

for
i
in
range(total):

if

not
message
or
random.random() <

0.25
:

# 随机的获取一些信息
message =
'{0} {1} {2}...'
.format(random.choice(verb),
random.choice(adjective),
random.choice(noun))

# 更新Celery任务状态
self.update_state(state=
'PROGRESS'
,
meta={
'current'
: i,
'total'
: total,

'status'
: message})
time.sleep(
1
)

# 返回字典

return
{
'current'
:
100
,
'total'
:
100
,
'status'
:
'Task completed!'
,

'result'
:
42
}

上述代码中,celery.task () 装饰器使用了 bind=True 参数,这个参数会让 Celery 将 Celery 本身传入,可以用于记录与更新任务状态。

然后就是一个 for 迭代,迭代的逻辑没什么意义,就是随机从 list 中抽取一些词汇来模拟一些逻辑的运行,为了表示这是耗时逻辑,通过 time.sleep (1) 休眠一秒。

每次获取一次词汇,就通过 self.update_state () 更新 Celery 任务的状态,Celery 包含一些内置状态,如 SUCCESS、STARTED 等等,这里使用了自定义状态「PROGRESS」,除了状态外,还将本次循环的一些信息通过 meta 参数 (元数据) 以字典的形式存储起来。有了这些数据,前端就可以显示进度条了。

定义好耗时方法后,再定义一个 Flask 接口方法来调用该耗时方法

@app
.route(
'/longtask'
, methods=[
'POST'
])
def
longtask():

# 异步调用
task = long_task.apply_async()

# 返回 202,与Location头

return
jsonify({}),
202
, {
'Location'

: url_for(
'taskstatus'
,
task_id=task.id)}

简单而言,前端通过 POST 请求到 /longtask,让后端开始去执行耗时任务。

返回的状态码为 202,202 通常表示一个请求正在进行中,然后还在返回数据包的包头 (Header) 中添加了 Location 头信息,前端可以通过读取数据包中 Header 中的 Location 的信息来获取任务 id 对应的完整 url。

前端有了任务 id 对应的 url 后,还需要提供一个接口给前端,让前端可以通过任务 id 去获取当前时刻任务的具体状态。

@app
.route(
'/status/<task>'
)
def
taskstatus(task_id):
task = long_task.
AsyncResult
(task_id)

if
task.state ==
'PENDING'
:
# 在等待
response = {

'state'
: task.state,

'current'
:
0
,


'total'
:
1
,

'status'
:
'Pending...'
}

elif
task.state !=
'FAILURE'
:
# 没有失败
response = {

'state'
: task.state,
# 状态

# meta中的数据,通过task.info.get()可以获得

'current'
: task.info.get(
'current'
,
0
),
# 当前循环进度

'total'
: task.info.get(
'total'
,
1
),
# 总循环进度

'status'
: task.info.get(
'status'
,
''
)
}

if

'result'

in
task.info:
response[
'result'
] = task.info[
'result'
]

else
:

# 后端执行任务出现了一些问题
response = {

'state'
: task.state,

'current'
:
1
,

'total'
:
1
,

'status'
: str(task.info),
# 报错的具体异常
}

return
jsonify(response)
/<task>

为了可以获得任务对象中的信息,使用任务 id 初始化 AsyncResult 类,获得任务对象,然后就可以从任务对象中获得当前任务的信息。

该方法会返回一个 JSON,其中包含了任务状态以及 meta 中指定的信息,前端可以利用这些信息构建一个进度条。

如果任务在 PENDING 状态,表示该任务还没有开始,在这种状态下,任务中是没有什么信息的,这里人为的返回一些数据。如果任务执行失败,就返回 task.info 中包含的异常信息,此外就是正常执行了,正常执行可以通 task.info 获得任务中具体的信息。

这样,后端的逻辑就处理完成了,接着就来实现前端的逻辑,要实现图形进度条,可以直接使用 nanobar.js,简单两句话就可以实现一个进度条,其官网例子如下:

var
options = {
classname:
'my-class'
,
id:
'my-id'
,

// 进度条要出现的位置
target: document.getElementById(
'myDivId'
)
};
// 初始化进度条对象
var
nanobar =
new

Nanobar
( options );
nanobar.go(
30
);
// 30% 进度条
nanobar.go(
76

);
// 76% 进度条
// 100% 进度条,进度条结束
nanobar.go(
100
);

有了 nanobar.js 就非常简单了。

先定义一个简单的 HTML 界面


Long running task with progress updates


<button>
id
=
"start-bg-job"
>
Start Long Calculation
/<button>



id
=
"progress"
>

通过 JavaScript 实现对后台的请求

// 按钮点击事件
$(
function
() {
$(
'#start-bg-job'
).click(start_long_task);
});
// 请求 longtask 接口

function
start_long_task() {

// 添加元素在html中
div = $(
'
0%
...

'
);
$(
'#progress'
).append(div);

// 创建进度条对象

var
nanobar =
new

Nanobar
({
bg:
'#44f'
,
target: div[
0
].childNodes[
0
]
});

// ajax请求longtask
$.ajax({
type:
'POST'
,
url:
'/longtask'
,

// 获得数据,从响应头中获取Location

success:
function
(data, status, request) {
status_url = request.getResponseHeader(
'Location'
);

// 调用 update_progress() 方法更新进度条
update_progress(status_url, nanobar, div[
0
]);
},
error:
function
() {
alert(
'Unexpected error'
);
}
});
}
// 更新进度条
function
update_progress(status_url, nanobar, status_div) {

// getJSON()方法是JQuery内置方法,这里向Location中对应的url发起请求,即请求「/status/<task>」
$.getJSON(status_url,
function
(data) {

// 计算进度
percent = parseInt(data[
'current'
] *
100
/ data[
'total'
]);

// 更新进度条
nanobar.go(percent);

// 更新文字
$(status_div.childNodes[
1

]).text(percent +
'%'
);
$(status_div.childNodes[
2
]).text(data[
'status'
]);

if
(data[
'state'
] !=
'PENDING'
&& data[
'state'
] !=
'PROGRESS'
) {

if
(
'result'
in data) {

// 展示结果
$(status_div.childNodes[
3
]).text(
'Result: '
+ data[
'result'
]);
}

else
{

// 意料之外的事情发生
$(status_div.childNodes[
3
]).text(
'Result: '
+ data[
'state'
]);
}
}

else
{

// 2秒后再次运行
setTimeout(
function
() {
update_progress(status_url, nanobar, status_div);
},
2000
);
}
});
}
/<task>

可以通过注释阅读代码整体逻辑。

至此,需求实现完了,运行一下。

首先运行 Redis

redis-server

然后运行 celery

celery worker -A app.celery --loglevel=info

最后运行 Flask 项目

python app.py

效果如下:


Python Web:Flask 异步执行任务


最后,我自己是一名从事了多年开发的Python老程序员,辞职目前在做自己的Python私人定制课程,今年年初我花了一个月整理了一份最适合2019年学习的Python学习干货,可以送给每一位喜欢Python的小伙伴,想要获取的可以关注我的头条号并在后台私信我:01,即可免费获取。


分享到:


相關文章: