Django项目中使用Celery
Celery是Python开发分布式任务列队的处理库。可以异步分布式地异步处理任务,也可定时执行任务等等。通常我们可以在Django执行一些比较耗时的任务(例如批量发邮件)和后台定时任务等。
研究发现,在Django中使用celery有两种方式:
- 使用django-celery应用;
- 直接使用Celery。
我们这里选择的是第二种方案。
1. Celery中间人的选择
官方给出的broker列表如下
Name | Status | Monitoring | Remote Control |
---|---|---|---|
RabbitMQ | Stable | Yes | Yes |
Redis | Stable | Yes | Yes |
Amazon SQS | Stable | No | No |
Zookeeper | Experimental | No | No |
至于他们之间的优劣,大家有兴趣可以自行查阅,本文不做赘述,这里使用Redis作为broker。
2. 安装celery
Celery 提交到了 Python Package Index(PyPI)
上,所以你可以用标准的 Python 工具,诸如 pip
或 easy_install
来安装:
pip install celery
3. Django项目中使用celery
这里我也简单做一个示例。
首先,确保celery和redis已经安装好了,并且已经启动了Redis服务。
打开settings.py
所在的文件夹,新建celery.py
文件。加入如下代码(注意,我安装的celery版本为4.1.2。其他celery的版本代码可能不同):
# -*- coding: utf-8 -*-
from __future__ import absolute_import
import os
from celery import Celery, platforms
# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'my_site.settings')
# from django.conf import settings
platforms.C_FORCE_ROOT = True # 允许用root用户启动celery
# 实例化Celery
app = Celery('my_site')
# Using a string here means the worker will not have to
# pickle the object when using Windows.
# 使用django的settings文件配置celery
app.config_from_object('django.conf:settings', namespace='CELERY')
# Celery加载所有注册的应用
app.autodiscover_tasks()
这个文件还没被加载,接着打开settings.py
同个目录下的__init__.py
文件。让运行该Django项目的时候,加载该文件配置Celery。修改代码如下:
# -*- coding: utf-8 -*-
from __future__ import absolute_import
# This will make sure the app is always imported when
# Django starts so that shared_task will use this app.
# 引入celery实例对象
from .celery import app as celery_app
__all__ = ['celery_app']
还需在settings.py
中设置celery,尤其是中间人的设置。若不设置中间人,会提示无法连接中间人的错误。在settings.py
文件中添加如下设置:
# -*- coding: utf-8 -*-
from __future__ import absolute_import
from celery.schedules import crontab
CELERY_WORKER_MAX_TASKS_PER_CHILD = 100000 # 每个worker执行10w个任务就会被销毁,可防止内存泄露
# celery中间人 redis://redis服务所在的ip地址:端口/数据库号
CELERY_BROKER_URL = "redis://127.0.0.1:6379/3"
# celery结果返回,可用于跟踪结果
CELERY_RESULT_BACKEND = "redis://127.0.0.1:6379/4"
CELERY_RESULT_EXPIRES = 3600 * 24 # 任务多久被清除
CELERY_TASK_IGNORE_RESULT = False # 一般不关注结果,请开启该设置,如果要存结果,请配置CELERY_RESULT_BACKEND
# celery时区设置,使用settings中TIME_ZONE同样的时区
CELERY_TIMEZONE = 'Asia/Shanghai'
CELERY_TASK_DEFAULT_EXCHANGE = 'mysite_exchange'
CELERY_TASK_DEFAULT_EXCHANGE_TYPE = 'direct'
CELERY_TASK_DEFAULT_QUEUE = 'mysite_queue' # 默认是celery,一般修改
CELERY_TASK_DEFAULT_ROUTING_KEY = 'default'
# crotab任务,这里是下面即将出现的定时任务
CELERY_BEAT_SCHEDULE = {
'submit_urls_to_baidu': {
'task': 'article.tasks.submit_urls_to_baidu',
'schedule': crontab(minute=0, hour="*/1"), # 每小时执行
},
}
4. 把耗时任务和定时任务丢给celery处理
在app应用中新建文件tasks.py
,将定时给百度推送文章链接的代码放到该文件中并用定义为celery任务。tasks.py
文件如下代码:
# -*- coding: utf-8 -*-
import requests
from article.models import Article
from article.constants import BlogStatus, DOMAIN
from celery import shared_task
from utils.libs.logger.syslogger import SysLogger
@shared_task
def submit_urls_to_baidu():
articles = Article.objects.filter(status=BlogStatus.PUBLISHED).order_by('-id')
urls = [DOMAIN + article.get_absolute_url() for article in articles]
api = 'http://data.zz.baidu.com/urls?site=yangsihan.com&token=xxxxxxxxxx'
response = requests.post(api, data='\n'.join(urls))
SysLogger.info(response.content.decode())
print (response.content.decode())
当然还可以写一个异步任务也放到 tasks.py
里面,比如:
# coding:utf-8
from celery.decorators import task
import time
@task
def sendmail(email):
print('start send email to %s' % email)
time.sleep(5) #休息5秒
print('success')
return True
作为异步任务的话,在调用异步任务的地方就要改成类似如下代码:
# coding:utf-8
from django.shortcuts import render
from django.http import HttpResponse
from .models import Article
from .tasks import sendmail #引用tasks.py文件的中sendmail方法
import json
def home(request):
# 耗时任务,发送邮件(用delay执行方法)
sendmail.delay('test@test.com')
# 其他行为
data = list(Article.objects.values('name'))
return HttpResponse(json.dumps(data), content_type = 'application/json')
5. 本地启动celery并测试
启动celery之前,确保已经安装redis并启动redis服务。
本地开发环境运行redis-cli
看是否可以正常连接,若不行,再手工执行redis-server
命令并保持窗口即可。
接着,启动celery worker。这个worker是用于异步执行任务的“工作者”。进入manage.py
文件所在的目录,执行如下命令:
Celery -A my_site worker -l info # my_site是项目名
出现类似如下窗口和消息,则正常执行。
celery worker会扫描django项目中有哪些task任务,并加入进来。见上图的蓝色字下[tasks]
字样。
然后启动celery beat:
Celery -A my_site beat -l info
结果类似如下图:
最后,再启动django服务器。这个大家熟悉的python manage.py runserver
。我们的异步任务和定时任务就完成了。
在Redis的使用过程中,可能会出现
AttributeError: 'float' object has no attribute 'iteritems'
这样的错误,原因在于3.x版本的Redis有bug,你需要使用Redis==2.10.6。具体可参考:https://github.com/celery/celery/issues/5175
相关链接:
杨仕航博客