文档:http://docs.jinkan.org/docs/celery/getting-started/index.html
Celery的特点是:
-
简单,易于使用和维护,有丰富的文档。
-
高效,单个celery进程每分钟可以处理数百万个任务。
-
灵活,celery中几乎每个部分都可以自定义扩展。
任务队列是一种跨线程、跨机器工作的一种机制.
任务队列中包含称作任务的工作单元。有专门的工作进程持续不断的监视任务队列,并从中获得新的任务并处理.
celery通过消息进行通信,通常使用一个叫Broker(中间人)来协client(任务的发出者)和worker(任务的处理者). clients发出消息到队列中,broker将队列中的信息派发给worker来处理。
Celery的架构
Celery的架构由三部分组成,消息队列(message broker),任务执行单元(worker)和任务执行结果存储(task result store)组成。
一个celery系统可以包含很多的worker和broker
Celery本身不提供消息队列功能,但是可以很方便地和第三方提供的消息中间件进行集成,包括RabbitMQ,Redis,MongoDB等
安装
pip install -U celery #-U是update的意思,有就进行更新,没有就安装
#后面单独将celery运行起来就可以了
也可从官方直接下载安装包:https://pypi.python.org/pypi/celery/
tar xvfz celery-0.0.0.tar.gz
cd celery-0.0.0
python setup.py
python setup.py install
使用
使用celery第一件要做的最为重要的事情是需要先创建一个Celery实例,我们一般叫做celery应用,或者更简单直接叫做一个app。app应用是我们使用celery所有功能的入口,比如创建任务,管理任务等,在使用celery的时候,app必须能够被其他的模块导入。
一般celery任务目录直接放在项目的根目录下即可,路径:
luffyapi/
├── mycelery/
├── config.py # 配置文件
├── __init__.py
├── main.py # 主程序
└── sms/ # 一个目录可以放置多个任务,该目录下存放当前任务执行时需要的模块或依赖,也可以每个任务单独一个目录
└── tasks.py # 任务的文件,名称必须是这个!!!
main.py,代码:
# 主程序
from celery import Celery
# 创建celery实例对象
app = Celery("luffy")
# 通过app对象加载配置,文件路径
app.config_from_object("mycelery.config")
# 自动搜索并加载任务
# 参数必须必须是一个列表,里面的每一个任务都是任务的路径名称
# app.autodiscover_tasks(["任务1","任务2"])
app.autodiscover_tasks(["mycelery.sms","mycelery.email"]) #会自动识别sms目录下面的tasks.py文件中的任务,所以不需写成mycelery.sms.tasks
# 启动Celery的命令
# 强烈建议切换目录到项目的根目录下启动celery!!
# celery -A mycelerymain worker --loglevel=info
配置文件config.py,代码:(文件形式,json形式,对象形式都行)
# 任务队列的链接地址(变量名必须叫这个)
broker_url = 'redis://127.0.0.1:6379/14'
# 结果队列的链接地址(变量名必须叫这个)
result_backend = 'redis://127.0.0.1:6379/15'
创建一个任务文件sms/tasks.py,并创建任务,代码:
# celery的任务必须写在tasks.py的文件中,别的文件名称不识别!!!
from mycelery.main import app
接下来,我们运行celery,在终端,项目根目录下(也就是mycelery的外层目录里面)执行指令
celery -A mycelery.main worker --loglevel=info (或者直接写info也行) #-A是指定celery启动入口
效果如下:
- ** ---------- [config] - ** ---------- .> app: __main__:0x10b24ba50 - ** ---------- .> transport: redis://127.0.0.1:6379/14 - ** ---------- .> results: redis://127.0.0.1:6379/15 - *** --- * --- .> concurrency: 16 (prefork) #表示它开启了16个线程准备来来执行任务,可以在后面执行任务的时候自行测试一下,一共可以有16个任务同时执行 -- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker) #有没有开启其他的事件(比如事件监听等等一些东西)
运行起来之后,如果又添加了新的任务,需要重新启动celery。
然后执行任务,可以在mycelery下面创建一个py文件进行测试,名字随便起,比如叫做runtask.py文件,内容如下
#引入任务 from mycelery.sms.tasks import send_sms #执行任务 send_sms.delay() #这就是将任务交给worker去执行了,这个任务在上面的时候已经加到队列中了,所以调用它的意思就是让worker去队列中找到send_sms这个任务去执行 #然后运行我们这个文件,右键运行就行,celery会在后台一直运行着
去redis中查看,就能看到任务执行结果了
如果想获取任务结果可以通过get方法,或者AsyncResult这个类来拿
方式1: import time from mycelery.sms.tasks import send_sms from mycelery.mail.tasks import send_emailret = send_sms.delay() print(ret,type(ret)) print(ret.ready()) print(ret.id) # time.sleep(3) print(ret.ready()) print(ret.get(timeout=1),)方式2 import time from mycelery.sms.tasks import send_sms from mycelery.mail.tasks import send_emailfrom celery.result import AsyncResult ret = send_sms.delay() #执行的任务如果需要参数,那么就直接在delay方法里面写:send_sms(mobile,sms_code),执行时:delay(mobile,sms_code) async_task = AsyncResult(id=ret.id,app=send_sms)print(async_task.successful()) result = async_task.get() print(result)
celery还有很多可配置的项,还可以拓展很多的方法,并且还能完成定时任务:定时备份数据库,定时分析日志文件等。关于这些,还是建议大家学习一下。
其他参考文档:
http://docs.celeryproject.org/en/latest/getting-started/introduction.html
https://github.com/celery/celery/tree/master/examples/django/
https://www.jianshu.com/p/1840035cb510
https://flower.readthedocs.io/en/latest/screenshots.html
接下来,我们需要把celery和django组合起来一起使用。
把django和celery进行组合
在main.py主程序中对django的配置文件进行加载
# 主程序 import os from celery import Celery # 创建celery实例对象 app = Celery("luffy") #celery对象可以创建多个,所以我们最好给我们当前的celery应用起个名字,比如叫做luffy# 把celery和django进行组合,需要识别和加载django的配置文件 import os os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'luffyapi.settings.dev') #如果只是使用了logging日志功能的话可以不写以下两句,因为logging是python提供的模块,但是将来可能使用celery来执行其他的django任务,所以我们先写上 import django django.setup()# 通过app对象加载配置 app.config_from_object("mycelery.config")# 加载任务 # 参数必须必须是一个列表,里面的每一个任务都是任务的路径名称 # app.autodiscover_tasks(["任务1","任务2"]) app.autodiscover_tasks(["mycelery.sms","mycelery.mail"])# 启动Celery的命令 # 切换目录到mycelery根目录下启动 # celery -A mycelery.main worker --loglevel=info
在需要使用django配置的任务中,直接加载配置,所以我们把注册的短信发送功能,整合成一个任务函数,代码:
from mycelery.main import app from lyapi.libs.yuntengxun.send_sms import send_message@app.task(name='send_sms') def send_sms(code, phone):ret = send_message(code, phone)return ret
在这个任务中,我们需要加载短信发送的sdk和相关的配置常量,所以我们可以直接把django中的短信发送模块和相关的常量配置文件直接剪切到当前sms任务目录中
mycelery/ ├── config.py ├── __init__.py ├── main.py └── sms/├── __init__.py├── tasks.py
再次启动项目即可。
最终在django里面,我们调用Celery来异步执行任务。需要完成2个步骤:
# 1. 声明一个和celery一模一样的任务函数,但是我们可以导包来解决 from mycelery.sms.tasks import send_sms# 2. 调用任务函数,发布任务 send_sms.delay(code, phone) # send_sms.delay() 如果调用的任务函数没有参数,则不需要填写任何内容