用Python動手實現一個簡單的Celery,並實現異步任務

簡介


Celery 是一個由 Python 實現的分佈式任務隊列,任務隊列通常有 3 個方面的功能。

  • 1. 減緩高併發壓力,先將任務寫入隊列,有空餘資源再運行
  • 2. 執行定時任務,先將任務寫入隊列,指定時間下再執行
  • 3. 異步任務,web 中存在耗時任務可以先將其寫入隊列,然後後臺任務進程去執行


已經有很多文章來描述 Celery 的用法與簡單原理,本篇文章也會簡單提及,但不會費太多筆墨。

本篇重點在於,利用 Python 動手實現一個簡單的 Celery,並使用自己實現的 Celery 實現異步任務,與上一篇「Python Web:Flask 異步執行任務」一樣,通過 Flask 構建一個簡單的 web,然後執行耗時任務,希望前端可以通過進度條顯示任務的進度。

需注意,這裡不會去解讀 Celery 的源碼,其源碼具有很多工程細節,比較複雜,這裡只是從其本質出發,簡單的實現一個玩具 Celery,這個玩具 Celery 在穩定性、效率等方面當然不能與 Celery 相比,但可以很好的理解 Celery 大體是怎麼實現的。

本文講究的是「形離神合」,與 Celery 實現細節不同,但本質原理相同。

那我們開始吧!

Celery 的概念與原理

Celery 5 個關鍵的概念,弄明白,就大致理解 Celery 了。

1.Task (任務)

簡單而言就是你要做的事情,如用戶註冊流程中的發送郵件

2.Worker (工作者)

在後臺處理 Task 的人

3.Broker (經紀人)

本質是一種隊列,Task 會交給 Broker ,Worker 會從 Broker 中取 Task ,並處理

4.Beat

定時任務調度器,根據定的時間,向 Broker 中添加數據,然後等待 Worker 去處理

5.Backend

用於保存 Worker 執行結果的對象,每個 Task 都要有返回值,這些返回值,就在 Backend 中


用Python動手實現一個簡單的Celery,並實現異步任務



這裡我們拋開這裡的各種概念,從更本質的角度來看 Celery,發現它就一個任務序列化存儲與反序列化獲取的過程。

以 Web 異步任務為例,使用方式通常為:

  • 1. 有一個要長時間處理 I/O 的函數,如果不將其異步執行就會產生的阻塞,這通常是不被允許的
  • 2. 啟動一個後臺任務執行進程
  • 3. 當要執行耗時函數時,不會立刻同步運行,而是提取函數的關鍵數據,將其序列化存儲到隊列中,隊列可以使用 Redis 或其他方式實現
  • 4. 後臺任務執行進程會從隊列中獲取數據,並將其反序列化還原
  • 5. 後臺任務執行進程會使用原來的函數以及還原的數據完成函數的執行,從而實現異步執行的效果。


流程並不複雜,Celery 中不同的概念分別負責上面流程中的不同部分。

實現一個簡單的 Celery


接著我們來實現一個 Celery,這裡 Celery 選擇 Redis 作為後端。

先來整理一個大體的框架。

首先我們需要定義一個 Task 類來表示要執行的任務,不同的任務要執行的具體邏輯由使用者自身編寫。

接著要定義一個任務隊列,即 Celery 中的 Broker,用於存儲要執行的任務。

隨後要定義執行進程 Worker,Worker 要從 Broker 中獲取任務並去執行。

最後還需要定義一個用於存儲任務返回數據的類,即 Celery 中的 Backend。

看上去有點複雜,不慌,其實很簡單。

實現任務類

首先來實現 task.py,用於定義任務相關的一些邏輯

# task.py
import
abc
import
json
import
uuid
import
traceback
import
pickle
from
broker
import

Broker
from
backend
import

Backend
class

BaseTask
(abc.ABC):

"""
Example Usage:
class AdderTask(BaseTask):
task_name = "AdderTask"
def run(self, a, b):
result = a + b
return result
adder = AdderTask()
adder.delay(9, 34)
"""
task_name =
None

def
__init__(self):

if

not
self.task_name:

raise

ValueError
(
"task_name should be set"
)
self.broker =
Broker
()

@abc
.abstractmethod
# abstractmethod 派生類必須重寫實現邏輯

def
run(self, *args, **kwargs):

# 寫上你具體的邏輯

raise

NotImplementedError
(
"Task `run` method must be implemented."
)

# 更新任務狀態

def
update_state(self, task_id, state, meta={}):
_task = {
"state"
: state,
"meta"
: meta}
serialized_task = json.dumps(_task)
backend =
Backend
()
backend.enqueue(queue_name=task_id, item=serialized_task)

print
(f
"task info: {task_id} succesfully queued"
)

# 異步執行

def
delay(self, *args, **kwargs):


try
:
self.task_id = str(uuid.uuid4())
_task = {
"task_id"
: self.task_id,
"args"
: args,
"kwargs"
: kwargs}
serialized_task = json.dumps(_task)

# 加入redis中
self.broker.enqueue(queue_name=self.task_name, item=serialized_task)

print
(f
"task: {self.task_id} succesfully queued"
)

except

Exception
:

# traceback.print_exc()

raise

Exception
(
"Unable to publish task to the broker."
)

return
self.task_id
# 獲取數據
def
async_result(task_id):
backend =
Backend
()
_dequeued_item = backend.dequeue(queue_name=task_id)
dequeued_item = json.loads(_dequeued_item)
state = dequeued_item[
"state"
]
meta = dequeued_item[
"meta"

]

class

Info
():

def
__init__(self, state, meta):
self.state = state
self.meta = meta
info =
Info
(state, meta)

return
info


上述代碼中,定義了 BaseTask 類,它繼承自 python 的 abc.ABC 成為一個抽象基類,其中一開始便要求必須定義 taskname,這是因為後面我們需要通過 taskname 去找對應的任務隊列。

BaseTask 類的 run () 方法被 abc.abstractmethod 裝飾,該裝飾器會要求 BaseTask 的派生類必須重寫 run () 方法,這是為了讓使用者可以自定義自己的任務邏輯。

BaseTask 類的 updatestate () 方法用於更新任務的狀態,其邏輯很簡單,先將參數進行 JSON 序列化,然後調用 Backend 的 enqueue () 方法將數據存入,這裡的 Backend 其實是 Redis 實例,enqueue () 方法會將數據寫入 Redis 的 list 中,需要注意,這裡 list 的 key 為 taskid,即當前任務的 id。

BaseTask 類的 delay () 方法用於異步執行任務,首先通過 uuid 為任務創建一個唯一 id,然後將方法的參數通過 JSON 序列化,然後調用 Broker 的 enqueue () 將數據存入,這裡的 Broker 其實也是一個 Redis 實例,enqueue () 方法同樣是將數據寫入到 Redis 的 list 中,只是 list 的 key 為 task_name,即當前任務的名稱。

此外還實現了 async_result () 方法,該方法用於異步獲取任務的數據,通過該方法可以獲得任務的執行結果,或任務執行中的各種數據,數據的結構是有簡單約定的,必須要有 state 表示當然任務的狀態,必須要有 meta 表示當前任務的一些信息。

實現 Broker 與 Backend

在 task.py 中使用了 Broker 與 Backend,那接著就來實現一下這兩個,先實現 Broker。

# broker.py
import
redis
# pip install redis
class

Broker
:

"""
use redis as our broker.
This implements a basic FIFO queue using redis.
"""

def
__init__(self):
host =
"localhost"
port =
6379
password =
None
self.redis_instance = redis.
StrictRedis
(
host=host, port=port, password=password, db=
0
, socket_timeout=
8.0

)

def
enqueue(self, item, queue_name):
self.redis_instance.lpush(queue_name, item)

def
dequeue(self, queue_name):
dequed_item = self.redis_instance.brpop(queue_name, timeout=
3
)

if

not
dequed_item:

return

None
dequed_item = dequed_item[
1
]

return
dequed_item


沒什麼可講的,就是定了兩個方法用於數據的存儲與讀取,存儲使用 lpush 方法,它會將數據從左邊插入到 Redis 的 list 中,讀取數據使用 brpop 方法,它會從 list 的右邊取出第一個元素,返回該元素值並從 list 刪除,左進右出就構成了一個隊列。

為了簡便,Backend 的代碼與 Broker 一模一樣,只是用來存儲任務的信息而已,代碼就不貼了。

後臺任務執行進程 Worker

接著來實現後臺任務執行進程 Worker。

# worker.py
import
json
class

Worker
:

"""
Example Usage:
task = AdderTask()
worker = Worker(task=task)
worker.start()
"""

def
__init__(self, task) ->
None
:
self.task = task

def
start(self,):

while

True
:

try
:
_dequeued_item = self.task.broker.dequeue(queue_name=self.task.task_name)
dequeued_item = json.loads(_dequeued_item)
task_id = dequeued_item[
"task_id"
]
task_args = dequeued_item[
"args"
]
task_kwargs = dequeued_item[
"kwargs"
]
task_kwargs[
'task_id'
] = task_id

self.task.run(*task_args, **task_kwargs)

print
(
"succesful run of task: {0}"
.format(task_id))

except

Exception
:

print
(
"Unable to execute task."
)

continue

上述代碼中,定義了 Worker 類,Worker 類在初始化時需要指定具體的任務實例,然後從 broker 中獲取任務相關的數據,接著調用其中的 run () 方法完成任務的執行,比較簡單。

使用玩具 Celery

玩具 Celery 的關鍵結構都定義好了,接著就來使用一下它,這裡依舊會使用「Python Web:Flask 異步執行任務」文章中的部分代碼,如前端代碼,這裡也不再討論其前端代碼,沒有閱讀可以先閱讀一下,方便理解下面的內容。

首先定義出一個耗時任務

# app.py
class

LongTask
(
BaseTask
):
task_name =
"LongTask"

def
run(self, task_id):

"""Background task that runs a long function with progress reports."""
verb = [
'Starting up'
,
'Booting'
,
'Repairing'
,
'Loading'
,
'Checking'
]
adjective = [
'master'
,
'radiant'
,
'silent'
,
'harmonic'
,
'fast'
]
noun = [
'solar array'
,
'particle reshaper'
,
'cosmic ray'
,
'orbiter'
,
'bit'
]
message =
''
total = random.randint(
10
,
50

)

for
i
in
range(total):

if

not
message
or
random.random() <
0.25
:
message =
'{0} {1} {2}...'
.format(random.choice(verb),
random.choice(adjective),
random.choice(noun))
self.update_state(task_id=task_id, state=
'PROGRESS'
,
meta={
'current'
: i,
'total'
: total,

'status'
: message})
time.sleep(
1
)
self.update_state(task_id=task_id, state=
'FINISH'
, meta={
'current'
:
100
,
'total'
:
100
,
'status'
:
'Task completed!'
,
'result'

:
32
})

return


每個耗時任務都要繼承在 BaseTask 並且重寫其 run () 方法,run () 方法中的邏輯就是當前這個耗時任務要處理的具體邏輯。

這裡邏輯其實很簡單,就是隨機的從幾個列表中抽取詞彙而已。

在 for 迭代中,想要前端知道當前任務 for 迭代的具體情況,就需要將相應的數據通過 BaseTask 的 updatestate () 方法將其更新到 backend 中,使用 taskid 作為 Redis 中 list 的 key。

當邏輯全部執行完後,將狀態為 FINISH 的信息存入 backend 中。

寫一個接口來觸發這個耗時任務

# app.py
@app
.route(
'/longtask'
, methods=[
'POST'
])
def
longtask():
long_task =
LongTask
()
task_id = long_task.delay()

return

jsonify({}),
202
, {
'Location'
: url_for(
'taskstatus'
,
task_id=task_id)}


邏輯非常簡單,實例化 LongTask (),並調用其中的 delay () 方法,該方法會將當前任務存入認為隊列中,當前的請求會將當前任務的 task_id 通過響應包頭的中的 taskstatus 字段傳遞給前端。

前端獲取到後,就可以通過 task_id 去獲取當前任務執行狀態等信息,從而實現前端的可視化。

接著定義相應的接口來獲取當前任務的信息,調用用 async_result () 方法,將 taskid 傳入則可。

# app.py
@app
.route(
'/status/<task>'
)
def
taskstatus(task_id):
info = async_result(task_id)

print
(info)

if
info.state ==
'PENDING'
:
response = {

'state'
: info.state,

'current'
:
0
,

'total'
:
1
,

'status'
:
'Pending...'
}

elif
info.state !=
'FAILURE'
:
response = {

'state'
: info.state,

'current'
: info.meta.
get
(
'current'
,
0
),

'total'
: info.meta.
get
(
'total'
,
1
),

'status'
: info.meta.
get
(
'status'

,
''
)
}

if

'result'

in
info.meta:
response[
'result'
] = info.meta[
'result'
]

else
:

# something went wrong in the background job
response = {

'state'
: info.state,

'current'
:
1
,

'total'
:
1
,

'status'
: str(info.meta),
# this is the exception raised
}

return
jsonify(response)

/<task>

最後,需要定義一個啟動後臺任務執行進程的邏輯

# run_worker.py
from
worker
import

Worker
from
app
import

LongTask
if
__name__ ==
"__main__"
:
long_task =
LongTask
()
worker =
Worker
(task=long_task)
worker.start()


至此,整體結構就構建完了,使用一下。

首先運行 redis。

redis-server


然後運行 Flask。

python app.py

最後啟動一下後臺任務執行進程,它相當於 Celery 的 celery-A xxx worker--loglevel=info命令。

python run_worker.py

同時執行多個任務,效果如下


用Python動手實現一個簡單的Celery,並實現異步任務


對應的一些打印如下:

python run_worker.py
Unable
to execute task.
Unable
to execute task.
Unable
to execute task.
task info:
3c7cd8ac
-
7482
-
467b
-b17c-dba2649b70ee succesfully queued
task info:
3c7cd8ac
-
7482
-
467b
-b17c-dba2649b70ee succesfully queued
task info:
3c7cd8ac
-
7482
-
467b
-b17c-dba2649b70ee succesfully queued
task info:
3c7cd8ac
-
7482
-
467b
-b17c-dba2649b70ee succesfully queued
python app.py
*
Serving

Flask
app
"app"

(lazy loading)
*
Environment
: production
WARNING:
Do

not

use
the development server
in
a production environment.

Use
a production WSGI server instead.
*
Debug
mode: on
*
Running
on http:
//127.0.0.1:5000/ (Press CTRL+C to quit)
*
Restarting

with
stat
*
Debugger

is
active!
*
Debugger
PIN:
145
-
285
-
706
127.0
.
0.1
- - [
25
/
Sep
/
2019


11
:
14
:
07
]
"GET / HTTP/1.1"

200
-
task:
3c7cd8ac
-
7482
-
467b
-b17c-dba2649b70ee succesfully queued
127.0
.
0.1
- - [
25
/
Sep
/
2019

11
:
14
:
11
]
"POST /longtask HTTP/1.1"

202
-
<task.async>.
Info

object
at
0x107f50780
>
127.0
.
0.1
- - [
25

/
Sep
/
2019

11
:
14
:
11
]
"GET /status/3c7cd8ac-7482-467b-b17c-dba2649b70ee HTTP/1.1"

200
-
<task.async>.
Info

object
at
0x107f50a20
>
127.0
.
0.1
- - [
25
/
Sep
/
2019

11
:
14
:
13
]
"GET /status/3c7cd8ac-7482-467b-b17c-dba2649b70ee HTTP/1.1"

200
/<task.async>/<task.async>



需要注意一些,上面的代碼中,使用 Worker 需要實例化具體的任務,此時任務實例與 app.py 中通過接口創建的任務實例是不同的,Worker 利用不同的實例,使用相同的參數,從而實現執行效果相同的目的。


分享到:


相關文章: