Django项目中使用Celery

Celery是Python开发分布式任务列队的处理库。可以异步分布式地异步处理任务,也可定时执行任务等等。通常我们可以在Django执行一些比较耗时的任务(例如批量发邮件)和后台定时任务等。

研究发现,在Django中使用celery有两种方式: 1. 使用django-celery应用; 2. 直接使用Celery。

我们这里选择的是第二种方案。

1. Celery中间人的选择

官方给出的broker列表如下

Name Status Monitoring Remote Control
RabbitMQ Stable Yes Yes
Redis Stable Yes Yes
Amazon SQS Stable No No
Zookeeper Experimental No No
至于他们之间的优劣,大家有兴趣可以自行查阅,本文不做赘述,这里使用Redis作为broker。

2. 安装celery

Celery 提交到了 Python Package Index(PyPI)上,所以你可以用标准的 Python 工具,诸如 pipeasy_install 来安装:

pip install celery

3. Django项目中使用celery

这里我也简单做一个示例。

首先,确保celery和redis已经安装好了,并且已经启动了Redis服务。 打开settings.py所在的文件夹,新建celery.py文件。加入如下代码(注意,我安装的celery版本为4.1.2。其他celery的版本代码可能不同):

# -*- coding: utf-8 -*-
from __future__ import absolute_import

import os

from celery import Celery, platforms

# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'my_site.settings')

# from django.conf import settings

platforms.C_FORCE_ROOT = True   # 允许用root用户启动celery
# 实例化Celery
app = Celery('my_site')

# Using a string here means the worker will not have to
# pickle the object when using Windows.
# 使用django的settings文件配置celery
app.config_from_object('django.conf:settings', namespace='CELERY')
# Celery加载所有注册的应用
app.autodiscover_tasks()

这个文件还没被加载,接着打开settings.py同个目录下的__init__.py文件。让运行该Django项目的时候,加载该文件配置Celery。修改代码如下:

# -*- coding: utf-8 -*-
from __future__ import absolute_import

# This will make sure the app is always imported when
# Django starts so that shared_task will use this app.
# 引入celery实例对象
from .celery import app as celery_app
__all__ = ['celery_app']

还需在settings.py中设置celery,尤其是中间人的设置。若不设置中间人,会提示无法连接中间人的错误。在settings.py文件中添加如下设置:

# -*- coding: utf-8 -*-

from __future__ import absolute_import
from celery.schedules import crontab

CELERY_WORKER_MAX_TASKS_PER_CHILD = 100000  # 每个worker执行10w个任务就会被销毁,可防止内存泄露
# celery中间人 redis://redis服务所在的ip地址:端口/数据库号
CELERY_BROKER_URL = "redis://127.0.0.1:6379/3"
# celery结果返回,可用于跟踪结果
CELERY_RESULT_BACKEND = "redis://127.0.0.1:6379/4"
CELERY_RESULT_EXPIRES = 3600 * 24  # 任务多久被清除
CELERY_TASK_IGNORE_RESULT = False  # 一般不关注结果,请开启该设置,如果要存结果,请配置CELERY_RESULT_BACKEND
# celery时区设置,使用settings中TIME_ZONE同样的时区
CELERY_TIMEZONE = 'Asia/Shanghai'

CELERY_TASK_DEFAULT_EXCHANGE = 'mysite_exchange'
CELERY_TASK_DEFAULT_EXCHANGE_TYPE = 'direct'
CELERY_TASK_DEFAULT_QUEUE = 'mysite_queue'  # 默认是celery,一般修改
CELERY_TASK_DEFAULT_ROUTING_KEY = 'default'

# crotab任务,这里是下面即将出现的定时任务
CELERY_BEAT_SCHEDULE = {
    'submit_urls_to_baidu': {
        'task': 'article.tasks.submit_urls_to_baidu',
        'schedule': crontab(minute=0, hour="*/1"),  # 每小时执行
    },
}

4. 把耗时任务和定时任务丢给celery处理

在app应用中新建文件tasks.py,将定时给百度推送文章链接的代码放到该文件中并用定义为celery任务。tasks.py 文件如下代码:

# -*- coding: utf-8 -*-

import requests
from article.models import Article
from article.constants import BlogStatus, DOMAIN
from celery import shared_task
from utils.libs.logger.syslogger import SysLogger


@shared_task
def submit_urls_to_baidu():
    articles = Article.objects.filter(status=BlogStatus.PUBLISHED).order_by('-id')
    urls = [DOMAIN + article.get_absolute_url() for article in articles]
    api = 'http://data.zz.baidu.com/urls?site=yangsihan.com&token=xxxxxxxxxx'
    response = requests.post(api, data='\n'.join(urls))
    SysLogger.info(response.content.decode())
    print (response.content.decode())

当然还可以写一个异步任务也放到 tasks.py 里面,比如:

# coding:utf-8
from celery.decorators import task 
import time

@task
def sendmail(email):
    print('start send email to %s' % email)
    time.sleep(5) #休息5秒
    print('success')
    return True

作为异步任务的话,在调用异步任务的地方就要改成类似如下代码:

#coding:utf-8
from django.shortcuts import render
from django.http import HttpResponse

from .models import Article
from .tasks import sendmail #引用tasks.py文件的中sendmail方法
import json

def home(request):
    #耗时任务,发送邮件(用delay执行方法)
    sendmail.delay('test@test.com')

    #其他行为
    data = list(Article.objects.values('name'))
    return HttpResponse(json.dumps(data), content_type = 'application/json')

5. 本地启动celery并测试

启动celery之前,确保已经安装redis并启动redis服务。 本地开发环境运行redis-cli看是否可以正常连接,若不行,再手工执行redis-server命令并保持窗口即可。 接着,启动celery worker。这个worker是用于异步执行任务的“工作者”。进入manage.py文件所在的目录,执行如下命令:

Celery -A my_site worker -l info  # my_site是项目名

出现类似如下窗口和消息,则正常执行。 celery worker会扫描django项目中有哪些task任务,并加入进来。见上图的蓝色字下[tasks]字样。

然后启动celery beat:

Celery -A my_site beat -l info

结果类似如下图: 最后,再启动django服务器。这个大家熟悉的python manage.py runserver。我们的异步任务和定时任务就完成了。

在Redis的使用过程中,可能会出现AttributeError: 'float' object has no attribute 'iteritems'这样的错误,原因在于3.x版本的Redis有bug,你需要使用Redis==2.10.6。具体可参考:https://github.com/celery/celery/issues/5175

相关链接:
杨仕航博客