Skip to main content

celery执行异步任务

在Django中使用celery可以实现异步执行任务的功能。以下是在Django中使用celery的步骤:

第一步:配置celery.py文件

在Django项目的根目录dx_movie/dx_movie下新建celery.py文件,并将以下配置信息复制进去:

dx_movie/dx_movie/celery.py
import os

from celery import Celery

# Set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'dx_movie.settings')

app = Celery('dx_movie')

# Using a string here means the worker doesn't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
# should have a `CELERY_` prefix.
app.config_from_object('django.conf:settings', namespace='CELERY')

# Load task modules from all registered Django apps.
app.autodiscover_tasks()


@app.task(bind=True, ignore_result=True)
def debug_task(self):
print(f'Request: {self.request!r}')

第二步:导入celery.py文件

dx_movie/dx_movie/__init__.py全局配置的初始化文件中,导入celery.py, 代码如下:

dx_movie/dx_movie/__init__.py
import pymysql
from .celery import app as celery_app

pymysql.install_as_MySQLdb()

__all__ = ('celery_app',)

第三步:配置settings.py文件

在Django项目的settings.py中添加以下配置信息:

dx_movie/dx_movie/settings.py
# settings.py
# 配置Celery
CELERY_BROKER_URL = 'redis://localhost:6379/0'
CELERY_RESULT_BACKEND = 'redis://localhost:6379/1'
CELERY_TIMEZONE = "Asia/Shanghai"
CELERY_TASK_TRACK_STARTED = True
CELERY_TASK_TIME_LIMIT = 30 * 60

第四步:编写任务

在trade应用下,创建一个名为tasks.py的文件,编写任务函数:

dx_movie/trade/tasks.py
from datetime import timedelta

from django.utils import timezone
from celery import shared_task

from .models import Order

@shared_task
def add(x, y):
return x + y


@shared_task
def mul(x, y):
return x * y


@shared_task
def xsum(numbers):
return sum(numbers)

第五步: 配置路由

配置一个路由,用于显示task接口。

dx_movie/dx_movie/urls.py
urlpatterns = [

# 新增代码
path('api/tasks/', trade_views.TaskAPIView.as_view())
]

第六步: 创建视图

在trade/views.py下创建TaskAPIView, 代码如下:

dx_movie/trade/views.py
from .tasks import add, mul, xsum

class TaskAPIView(APIView):

def get(self, request):
result1 = add.delay(3, 4)
print(f'add : {result1}')

result2 = mul.delay(4,5)
print(f'mul: {result2}')

result3 = xsum.delay([1,2,3])
print(f'xsum: {result3}')

return Response('执行task')

注意:task.delay()表示异步执行任务,task()表示同步执行任务。

第七步:启动redis 和 celery

先启动redis, 查看redis小结。

执行以下命令启动celery worker:

celery -A dx_movie worker -l info

第八步:监测celery任务

可以安装并启动Flower监测celery任务:

celery -A dx_movie flower

测试

访问tasks接口,示例如下:

图86-访问tasks接口

接下来,就查看flower, 示例如下:

图86-flower监视任务

每次访问tasks接口,flower都会检测到新增任务。

以上就是在Django中使用celery执行异步任务的步骤。通过使用celery,我们可以将一些耗时的任务放到后台执行,提高系统的响应速度。

请根据你的项目实际情况进行相应的配置和调用任务函数。