celery
介绍
https://github.com/celery/celery/
https://docs.celeryq.dev/en/stable/
-
celery是一个分布式异步任务框架,是一个灵活且可靠的,处理大量消息的分布式系统,可以在多个节点之间处理某个任务,是一个专注于实时处理的任务队列,支持任务调度,所以 celery 本质上是一个分布式的异步任务调度框架,类似于 Apache 的 airflow,只是用来调度任务的,但它本身并不具备存储任务的功能,而调度任务的时候肯定是要把任务存起来的。因此要使用 celery 的话,还需要搭配一些具备存储、访问功能的工具,比如:消息队列、Redis缓存、数据库等等。
-
celery能做什么
- 定时任务
- 异步任务
- 延迟任务
celery的使用场景
# 1 异步任务-一些耗时的操作可以交给celery异步执行,而不用等着程序处理完才知道结果。-视频转码、邮件发送、消息推送等等
# 2 定时任务-定时推送消息、定时爬取数据、定时统计数据等
# 3 延迟任务-提交任务后,等待一段时间再执行某个任务
1 Celery 架构,它采用典型的生产者-消费者模式,主要由以下部分组成:2 Celery Beat,任务调度器,Beat 进程会读取配置文件的内容,周期性地将配置中到期需要执行的任务发送给任务队列。3 Producer:需要在队列中进行的任务,一般由用户、触发器或其他操作将任务入队,然后交由 workers 进行处理。调用了 Celery 提供的 API、函数或者装饰器而产生任务并交给任务队列处理的都是任务生产者。4 Broker,即消息中间件,在这指任务队列本身,Celery 扮演生产者和消费者的角色,brokers 就是生产者和消费者存放/获取产品的地方(队列)。5 Celery Worker,执行任务的消费者,从队列中取出任务并执行。通常会在多台服务器运行多个消费者来提高执行效率。6 Result Backend:任务处理完后保存状态信息和结果,以供查询。Celery 默认已支持 Redis、RabbitMQ、MongoDB、Django ORM、SQLAlchemy 等方式。实际应用中,用户从 Web 前端发起一个请求,我们只需要将请求所要处理的任务丢入任务队列 broker 中,由空闲的 worker 去处理任务即可,处理的结果会暂存在后台数据库 backend 中。我们可以在一台机器或多台机器上同时起多个 worker 进程来实现分布式地并行处理任务。
快速使用
# 0 创建Python项目# 1 创建虚拟环境# 2 安装celery
pip install celery# 3 安装redis(消息队列和结果存储使用redis)
pip install redis# 4 安装eventlet(win 平台,如果是mac,linux不需要)
pip install eventlet
celery_demo/main.py 主文件
from celery import Celery
import timebroker = "redis://127.0.0.1:6379/1"
backend = "redis://127.0.0.1:6379/2"# 创建app对象
app = Celery("test", broker=broker, backend=backend)@app.task
def add(x, y):time.sleep(2)return x + y@app.task
def send_msg(mobile, code):print(f"手机号:{mobile},发送短信{code}成功")return "发送成功!"
celery_demo/task_add.py
from main import add, send_msg# 同步执行任务
res = add(4, 5)
print(res)# 提交到broker消息队列中,异步执行
res = add.delay(4, 5)
print(res) # c1b10e79-37a5-41c5-8fc5-5a189bce1951
# 返回的是任务的id号,任务被提交到消息中间件 broker redis
查看提交的任务
让worker执行任务
# win启动
celery -A main worker -l info -P eventlet
# mac linux
celery -A main worker -l info
结果存储查看结果
# 1 直接看redis 有数据# 2 通过代码,拿到结果
from main import app
from celery.result import AsyncResult
id = 'c1b10e79-37a5-41c5-8fc5-5a189bce1951'
if __name__ == '__main__':result = AsyncResult(id=id, app=app)if result.successful():result = result.get()print(result) # 9 因为之前执行的是addelif result.failed():print('任务失败')elif result.status == 'PENDING':print('任务等待中被执行')elif result.status == 'RETRY':print('任务异常后正在重试')elif result.status == 'STARTED':print('任务已经开始被执行')
celery包结构
目录结构
celery_demo├── celery_task # celery包│ ├── __init__.py # 包文件│ ├── celery.py # celery连接和配置相关文件,且名字必须叫celery.py│ ├── user_tasks.py # 所有任务函数│ └── order_tasks.py # 所有任务函数├── add_task.py # 添加任务└── get_result.py # 获取结果
celery.py
from celery import Celerybroker = "redis://127.0.0.1:6379/1"
backend = "redis://127.0.0.1:6379/2"app = Celery("testcelery",backend=backend,broker=broker,include=["celery_task.order_tasks", "celery_task.user_tasks"],
)
user_tasks.py
from .celery import app
import time# 用户相关任务
@app.task
def add(x, y):time.sleep(3)return x + y
order_task.py
from .celery import app# 订单相关任务
# 下单成功,发送短信
@app.task
def send_sms(mobile, code):print(f"手机号:{mobile},发送成功{code}")return "ok,发送成功!"
add_task.py 提交任务
from celery_task.order_tasks import send_sms# 1 同步调用
# res=send_sms('111111',888)
# print(res)# 2 提交到任务队列被 worker执行
res = send_sms.delay("15666777888", 9999)
print(res) # 39a9a1f8-9907-4589-a5ba-6b1b291b42ab
get_result.py 查看任务结果
from celery_task.celery import app
from celery.result import AsyncResultid = "39a9a1f8-9907-4589-a5ba-6b1b291b42ab"
if __name__ == "__main__":result = AsyncResult(id=id, app=app)if result.successful():result = result.get()print(result)elif result.failed():print("任务失败")elif result.status == "PENDING":print("任务等待中被执行")elif result.status == "RETRY":print("任务异常后正在重试")elif result.status == "STARTED":print("任务已经开始被执行")
celery异步-延迟-定时任务
异步任务
任务名.delay(参数)
延迟任务
任务名.apply_async(args=[参数],eta=时间对象)
from celery_task.user_tasks import add
from datetime import datetime, timedelta# datetime.utcnow() 取utc时间---》默认使用utc时间
# 当前时间加了30s
eta = datetime.utcnow() + timedelta(seconds=30)# eta 要放时间对象
res = add.apply_async(args=[5, 6], eta=eta)
print(res) # f2dc3e99-4232-48dd-860b-cfe4bb2fe8b7
等了30秒返回结果
定时任务
用beat启动
要写配置文件 在celery中修改
# 时区
app.conf.timezone = 'Asia/Shanghai'
# 是否使用UTC
app.conf.enable_utc = False# 任务的定时配置
from datetime import timedelta
from celery.schedules import crontab
app.conf.beat_schedule = {'add-task': {'task': 'celery_task.user_task.add','schedule': timedelta(seconds=3),# 'schedule': crontab(hour=8, day_of_week=1), # 每周一早八点'args': (300, 150),},'send-sms-task': {'task': 'celery_task.order_task.send_sms',# 'schedule': timedelta(seconds=30),'schedule': crontab(hour=11,minute=20), # 每天11点20执行'args': ('189232222',888),},
}
启动beat
只要启动了beat就自动会按照设置的时间提交任务
celery -A celery_task beat -l debug
启动worker
celery -A celery_task worker -l info -P eventlet
django中使用celery
通用方案
1 把项目结构的包直接放到项目根路径中
2 在视图函数中提交任务
3 启动worker
4 运行django,正常使用接口
from celery_task.user_tasks import add
class CeleryView(GenericViewSet):def list(self, request, *args, **kwargs):res = add.delay(4, 5)return APIResponse(msg=str(res))# 3 启动worker
celery -A celery_task worker -l info -P eventlet
# 4 运行django,正常使用接口即可
python manage.py runserver
注意:
要在celery.py中配置django的环境变量让celery能识别到
from celery import Celery
import osos.environ.setdefault("DJANGO_SETTINGS_MODULE", "luffy_api.settings.dev")broker = "redis://127.0.0.1:6379/1"
backend = "redis://127.0.0.1:6379/2"app = Celery("testcelery",backend=backend,broker=broker,include=["celery_task.order_tasks", "celery_task.user_tasks"],
)
启动的时候直接用包名启动即可,在根路径
(luffy) PS D:\2023propygo\luffy_api>
celery -A celery_task worker -l info -P eventlet
celery官方方案
# 1 安装模块
pip install Django==3.2.22
pip install celery
pip install redis
pip install eventlet #在windows环境下需要安装eventlet包
luffy_api/celery.py
from celery import Celery
import django
import os
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "luffy_api.settings.dev")
django.setup()app = Celery('testcelery')
app.config_from_object("django.conf:settings",namespace="CELERY")app.autodiscover_tasks()
common_settings.py/dev.py
CELERY_BROKER_URL = 'redis://127.0.0.1:6379/1'
# BACKEND配置,使用redis
CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/2'
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json'
# 结果序列化方案
CELERY_RESULT_SERIALIZER = 'json'
# 任务结果过期时间,秒
CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24
# 时区配置
CELERY_TIMEZONE = 'Asia/Shanghai'
luffy_api/user/tasks.py
from celery import shared_task@shared_task
def add(x, y):return x + y
views.py
class CeleryTestView(GenericViewSet):def list(self, request, *args, **kwargs):res = add.delay(1,2)return APIResponse(msg=str(res))
启动的时候要使用app名字.celery,比如:
(luffy) PS D:\2023propygo\luffy_api> celery -A luffy_api.celery worker -l info -P eventlet
并发量和qps
# 1 并发量是此刻有多少并发--》请求
# 2 qps:每秒钟响应的数量# 并发量定了:10,这个接口2s钟响应-2s能处理10个用户请求-1s能处理5个用户请求-qps就是5-提高qps,如何做?-1 提高并发--》提不了了-2 提供响应速度--》0.5s响应回去-3 1s钟就能处理20请求--》qps就是20# 使用异步,提高项目的qps
定时更新缓存
加入缓存之后,数据库改了,缓存中也要改,缓存双写一致性
celery_task/home_tasks.py
查询出来的数据不带前缀,需要自己手动拼
from .celery import app
from home.models import Banner
from home.serializer import Bannerserializer
from django.conf import settings
from django.core.cache import cache@app.task
def update_banner():queryset = (Banner.objects.all().filter(is_delete=False, is_show=True).order_by("orders")[0 : settings.BANNER_COUNT])serializer = Bannerserializer(instance=queryset, many=True)for i in serializer.data:i["image"] = "http://127.0.0.1:8000" + i["image"]cache.set("banner_list", serializer.data)return "轮播图缓存更新成功"
celery_task/celery.py
定时任务
包结构要在app的include中注册,才能被检索到
from celery import Celery
import osos.environ.setdefault("DJANGO_SETTINGS_MODULE", "luffy_api.settings.dev")broker = "redis://127.0.0.1:6379/1"
backend = "redis://127.0.0.1:6379/2"app = Celery("testcelery",backend=backend,broker=broker,include=["celery_task.home_tasks"],
)# 时区
app.conf.timezone = "Asia/Shanghai"
# 是否使用UTC
app.conf.enable_utc = False# 任务的定时配置
from datetime import timedelta
from celery.schedules import crontabapp.conf.beat_schedule = {"banner-task": {"task": "celery_task.home_tasks.update_banner","schedule": timedelta(seconds=30),"args": (),},
}
启动任务
celery -A celery_task beat -l info
celery -A celery_task worker -l info
task和share_task的区别
django-celery中有两个装饰函数。一个是@task,另一个是@share_task。两者区别在于,前者只能自己这个APP使用。后者是一个全局的配置,多个初始化的APP都可以使用。
task
装饰函数,将函数当成celery的任务函数
import time
from celery import Celery
broker = "redis://127.0.0.1:6379/1"
backend = "redis://127.0.0.1:6379/2"app = Celery("testcelery",backend=backend,broker=broker,
)@app.task
def add(x, y):time.sleep(10)return x + y
share_task
-
装饰函数,将函数当成celery的任务函数
-
不依赖某个celery对象,而是加载到内存之后自动添加到celery对象中
-
与多个celery对象进行关联
from celery import shared_task@shared_task
def add(x, y):return x + y
官方方案配置定时任务
dev.py
一定要是tasks才行
CELERY_BEAT_SCHEDULE = {'every_1_minutes': {'task': 'home.tasks.add','schedule': timedelta(seconds=2),'args': (1,2)},
}
tasks.py
from celery import shared_task@shared_task
def add(x, y):return x + y
celery -A luffy_api.celery worker -l info -P eventlet
celery -A luffy_api.celery beat -l debug
admin配置定时任务(手动)
pip install django-celery-beat
dev.py
INSTALLED_APPS = [....'django_celery_beat',
]CELERY_BEAT_SCHEDULER = 'django_celery_beat.schedulers.DatabaseScheduler'LANGUAGE_CODE = 'zh-hans'
TIME_ZONE = 'Asia/Shanghai'
USE_I18N = True
USE_TZ = True
celery.py
# 配置和django设置中一样的时区
from django.conf import settings
app.conf.timezone = settings.TIME_ZONE
# 迁移数据库
python manage.py migrate django_celery_beat
#在两个控制台分别启动woker和beat
celery -A luffy_api.celery worker -l debug -P eventlet
celery -A luffy_api.celery beat -l debug
访问admin
admin监视任务
- 在控制台监控任务执行情况,还不是很方便,最好是能够通过web界面看到任务的执行情况,如有多少任务在执行,有多少任务执行失败了等
- 这个Celery也是可以做到了,就是将任务执行结果写到数据库中,通过web界面显示出来。
- 这里要用到django-celery-results插件。
- 通过插件可以使用Django的orm作为结果存储,这样的好处在于我们可以直接通过django的数据查看到任务状态,同时为可以制定更多的操作
pip install django-celery-results
INSTALLED_APPS = (
...,
'django_celery_results',
)# 使用django-orm 作为结果存储
CELERY_RESULT_BACKEND = 'django-db'
# 迁移数据库
python manage.py migrate django_celery_results
访问admin
- 后期后台管理可以自己写
- 可以直接使用django的orm取数据
- 也可以放到redis中,自己写接口处理
Flower监控celery任务
如果不想通django的管理界面监控任务的执行,还可以通过Flower插件来进行任务的监控。Flower的界面更加丰富,可以监控的信息更全
Flower 是一个用于监控和管理 Celery 集群的开源 Web 应用程序。它提供有关 Celery workers 和tasks状态的实时信息
Flower可以:
1 实时监控celery的Events
-查看任务进度和历史记录
-查看任务详细信息(参数、开始时间、运行时间等)2 远程操作
-查看workers 状态和统计数据
-关闭并重新启动workers 实例
-控制工作池大小和自动缩放设置
-查看和修改工作实例消耗的队列
-查看当前正在运行的任务
-查看计划任务(预计到达时间/倒计时)
-查看保留和撤销的任务
-应用时间和速率限制
-撤销或终止任务3 Broker 监控
-查看所有 Celery 队列的统计信息
pip install flower
# 方式一
celery -A celery_demo flower --port-5555# 方式二
celery --broker=redis://127.0.0.1:6379/1 flower# 浏览器访问
http://127.0.0.1:5555/
任务异常自动告警
虽然可以通过界面来监控了,但是我们想要得更多,人不可能天天盯着界面看吧,如果能实现任务执行失败就自动发邮件告警就好了。这个Celery当然也是没有问题的。
通过钩子程序在异常的时候触发邮件通知
tasks.py
from celery import shared_task
import time
from celery import Task
from django.core.mail import send_mail
from django.conf import settings# 成功失败邮件告警
class SendEmailTask(Task):def on_success(self, retval, task_id, args, kwargs):info = f'任务成功-- 任务id是:{task_id} , 参数是:{args} , 执行成功 !'send_mail('celery任务监控成功告警', info, settings.EMAIL_HOST_USER, ["ssrheart@outlook.com",])print('------------成功')def on_failure(self, exc, task_id, args, kwargs, einfo):info = f'任务失败-- 任务id为:{task_id} , 参数为:{args} , 失败 ! 失败信息为: {exc}'send_mail('celery任务监控失败告警', info, settings.EMAIL_HOST_USER, ["ssrheart@outlook.com",])print('------------失败')def on_retry(self, exc, task_id, args, kwargs, einfo):print(f'任务id位::{task_id} , 参数为:{args} , 重试了 ! 错误信息为: {exc}')@shared_task(base=SendEmailTask, bind=True)
def add(a,b):time.sleep(1)return a+b@shared_task()
def send_email(mail):print(f'给{mail}发送邮件了')return '成功'# celery -A celery_demo worker -l debug -P eventlet
# celery -A celery_demo beat -l debug
# celery -A celery_demo flower --port-5566
django发送邮件
# 1 邮箱开启smtp
# 2 django配置文件配置
### 发送邮件
EMAIL_HOST = 'smtp.qq.com' # 如果是 qq 改成 smtp.qq.com
EMAIL_PORT = 465
EMAIL_HOST_USER = '446367977@qq.com' # 帐号
EMAIL_HOST_PASSWORD = '' # 密码
DEFAULT_FROM_EMAIL = EMAIL_HOST_USER
#这样收到的邮件,收件人处就会这样显示
#DEFAULT_FROM_EMAIL = 'heart<'446367977@qq.com>'
EMAIL_USE_SSL = True #使用ssl
#EMAIL_USE_TLS = False # 使用tls
#EMAIL_USE_SSL 和 EMAIL_USE_TLS 是互斥的,即只能有一个为 True# 3 发送邮件
class EmailView(ViewSet):def list(self, request, *args, **kwargs):to_user = request.query_params.get('to_user')send_mail('test1', 'test', settings.EMAIL_HOST_USER, [to_user, ])return APIResponse(msg=f'邮件已经发送:{str(to_user)}')
异步秒杀方案
# 秒杀功能- qps要高:承载住很多用户1s内把功能完成-创建订单-扣减库存- 效率要高# 同步秒杀-假设秒杀需要10s钟,项目并发量是3,总共5件商品要秒杀-10s内,只有3个人能进入到系统,并且开始秒杀# 异步秒杀-假设秒杀需要10s,项目并发量是3,总共5个商品要秒杀-使用异步,用户提交后,立马返回-10s内,可以响应很多很多用户提交秒杀任务:假设提交了100个用户-这100个用户中只有5个成功
同步秒杀
前端
<template><div><Header></Header><div style="padding: 50px;margin-left: 100px"><h1>Go语言课程</h1><img src="http://blog.ssrheart.top/img/202405171633804.png"height="300px"width="300px"><br><el-button type="danger" @click="handleSeckill" v-loading.fullscreen.lock="fullscreenLoading">秒杀课程</el-button></div><br><br><br><br><br><br><br><br><br><br><br><br><br><br><br><br><br><br><br><br><br><br><br><br><br><br><br><br><br><br><Footer></Footer></div>
</template><script setup>
import Header from "../components/Headers.vue"
import Footer from "../components/Footer.vue"
import {ref} from "vue"
import axios from "axios";const fullscreenLoading = ref(false)
const task_id = ref('')
let t = nullconst handleSeckill = () => {//同步秒杀fullscreenLoading.value = true;axios.post('http://127.0.0.1:8000/api/v1/home/seckill/seckill/', {course_id: '99',}).then(res => {fullscreenLoading.value = false;alert(res.data.msg)}).catch(err => {this.fullscreenLoading = false;alert(err)})
}</script>
后端
class SeckillView(ViewSet):@action(methods=['POST'], detail=False)def seckill(self, request, *args, **kwargs):'''#1 取出传入的 课程id#2 查询课程 是否还有剩余 1s#2.1 有剩余,开始下单扣减库存 1s#2.2,在订单表中生成一条记录 2s#2.3 秒杀成功返回给前端#3 课程没有剩余,秒杀失败,返回给前端'''course_id = request.data.get('course_id')#print('根据课程id:%s,查询课程是否还有剩余,耗时3s' % course_id)time.sleep(1)res = random.choice([True, False])if res: # 库存够print('扣减库存,耗时3s')time.sleep(1)print('下单,耗时4s')time.sleep(2)return APIResponse(msg='恭喜您秒到了')else:return APIResponse(code=101, msg='库存不足,秒杀失败')
异步秒杀
前端
<template><div><Header></Header><div style="padding: 50px;margin-left: 100px"><h1>Go语言课程</h1><img src="http://blog.ssrheart.top/img/202405171633804.png"height="300px"width="300px"><br><el-button type="danger" @click="handleSeckill" v-loading.fullscreen.lock="fullscreenLoading">秒杀课程</el-button></div><br><br><br><br><br><br><br><br><br><br><br><br><br><br><br><br><br><br><br><br><br><br><br><br><br><br><br><br><br><br><Footer></Footer></div>
</template><script setup>import Header from "../components/Headers.vue"import Footer from "../components/Footer.vue"import {ref} from "vue"import axios from "axios";const fullscreenLoading = ref(false)const task_id = ref('')let t = nullconst handleSeckill = () => {// 异步秒杀fullscreenLoading.value = true;axios.post('http://127.0.0.1:8000/api/v1/home/seckill/seckill/',{course_id: '99'}).then(res => {// 在排队,转圈的,还需要继续显示alert(res.data.msg)task_id.value = res.data.task_id// 继续发送请求---》查询是否秒杀成功:1 成功 2 没成功 3 秒杀任务还没执行// 启动定时任务,每隔1s,向后端发送一次请求t = setInterval(() => {axios.get('http://127.0.0.1:8000/api/v1/home/seckill/get_result/', {params:{task_id: task_id.value}}).then(res => {// 100 成功,success : 1 成功 0 失败 2 还没开始if (res.data.success == '1') {// 转圈框不显示fullscreenLoading.value = false;// 停止定时任务clearInterval(t)t = nullalert(res.data.msg)} else if (res.data.success == '0') {// 转圈框不显示fullscreenLoading.value = false;// 停止定时任务clearInterval(t)t = nullalert(res.data.msg)} else {// alert(res.msg)console.log(res.msg)}})}, 1000)}).catch(err => {fullscreenLoading.value = false;alert(err)})}
</script><style scoped></style>
后端
class SeckillView(ViewSet):@action(methods=['POST'], detail=False)def seckill(self, request, *args, **kwargs):course_id = request.data.get('course_id')task_id = seckill.delay(course_id)return APIResponse(msg='您正在排队', task_id=str(task_id))@action(methods=['GET'], detail=False)def get_result(self, request, *args, **kwargs):task_id = request.query_params.get('task_id')a = AsyncResult(id=task_id)if a.successful():result = a.get() # True 和 Falseif result:return APIResponse(success='1', msg='秒杀成功')else:return APIResponse(success='0', msg='秒杀失败')elif a.status == 'PENDING':print('任务等待中被执行')return APIResponse(success='2', msg='任务等待中被执行')else:return APIResponse(success='3', msg='秒杀任务正在执行')
tasks.py
from celery import shared_task
import time,random@shared_task
def seckill(course_id):print('根据课程id:%s,查询课程是否还有剩余,耗时2s' % course_id)time.sleep(2)res = random.choice([True, False])if res: # 库存够print('扣减库存,耗时1s')time.sleep(1)print('下单,耗时2s')time.sleep(2)return Trueelse:return False