Celery在Django中的使用介紹

Celery簡介

celery是一個簡單、靈活且可靠的,處理大量消息的分佈式系統,並且提供維護這樣一個系統的必須工具。

它是一個專注於實時處理的任務隊列,同時也支持任務調度。

何為任務隊列

任務隊列:是一種在線程和機器間分發任務的機制。

Celery在Django中的使用介紹

celery的三大組成部分

worker

任務執行單元-->Worker是Celery提供的任務執行的單元,worker併發的運行在分佈式的系統節點中。

broker(存tasks的倉庫)

消息中間件--> Celery本身不提供消息服務,但是可以方便的和第三方提供的消息中間件集成。包括,RabbitMQ, Redis等等

backend (存results的倉庫)

ask result store用來存儲Worker執行的任務的結果,Celery支持以不同方式存儲任務的結果,包括AMQP, redis等

Celery在Django中的使用介紹

使用場景

  • 異步任務:將耗時操作任務提交給celery去異步執行,比如發送短信、郵件,消息推送、音視頻處理等。
  • 定時任務:定時執行某件事情,比如每天數據統計

基本命令

# 1. 啟動celery服務:
# 非windows:
# 指令:celery worker -A celery_task(celery項目文件) -l info
# windows: 需要先下載eventlet模塊,pip install eventlet
# 指令: celery worker -A celery_task -l info -P eventlet
# 2. 添加任務:手動添加,需要自定義添加任務腳本;自動添加任務,在celery.py中配置
# 3. 獲取結構:手動獲取,需要自定義任務腳本

celery在Django項目中的使用

celery目錄結構

project 

|---celery_task
|---celery.py # celery連接和相關配置,且名字必須是celery.py,如果要自動添加任務,那麼相 關配置也在celery.py裡配置;
|---tasks.py # 所有任務函數
|---add_task.py # 手動添加任務:立即任務,延時任務,定時任務;
|---get_result.py # 獲取結果

後面兩個文件可以不用添加,看需求來。

Celery在Django中的使用介紹

使用

celery.py

from celery import Celery
# 導入時間相關包,用法看下面
from datetime import timedelta
from celery.schedules import crontab
# 因為需要調用Django項目中的models,所有需要添加Django環境
import os,django
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'bookapi.settings.dev')
django.setup()
# 添加任務的倉庫,這裡使用了Redis
broker = "redis://127.0.0.1:6379/11"
# 接收處理結果的倉庫
backend = "redis://127.0.0.1:6379/12"
# 指定需要處理的任務
include = ['celery_task.tasks']
app = Celery(broker=broker,backend=backend,include=include)
# 配置任務時區
app.conf.timezone = 'Asia/Shanghai'
app.conf.enable_utc = False
# 配置定時任務
app.conf.beat_schedule = {
'recommend-task': {
'task': 'celery_task.tasks.recommend_num',
# 'schedule': timedelta(seconds=20),
'schedule': crontab(hour=24),
'args': ()
},
'monthly-task': {
'task': 'celery_task.tasks.monthly_num',
# 'schedule': timedelta(seconds=60),
'schedule': crontab(day_of_month=1,hour=0),
'args': ()
}
}

tasks.py

from .celery import app
from bookapi.apps.user import models
@app.task
def recommend_num():
user_list = models.User.objects.all()
# print(user_list) ## <queryset>, <user>]>
for user in user_list:
models.User.objects.filter(username=user.username).update(recommend_nums=3)
@app.task
def monthly_num():
user_list = models.User.objects.all()
# print(user_list) ## <queryset>, <user>]>
for user in user_list:
models.User.objects.filter(username=user.username).update(monthly_nums=2)
/<user>/<queryset>/<user>/<queryset>

最後,小編想說:我是一名python開發工程師,整理了一套最新的python系統學習教程,想要這些資料的可以關注私信小編“01”即可,希望能對你有所幫助


分享到:


相關文章: