Django系列之Celery异步框架+RabbitMQ使用

在Django项目中,如何集成使用Celery框架来完成一些异步任务以及定时任务呢?

1. 安装

pip install celery		# celery框架
pip install django-celery-beat		# celery定时任务使用
pip install django-celery-results		# celery存储结果使用

2. Django集成celery

settings.py 配置文件中增加如下配置项:

INSTALLED_APPS = [...'celery','django_celery_beat','django_celery_results'
]"""以下是celery的相关配置"""
# 时区配置
CELERY_TIMEZONE = 'Asia/Shanghai'
CELERY_ENABLE_UTC = False
DJANGO_CELERY_BEAT_TZ_AWARE = False# broker backend 配置:使用rabbitmq作为中间件
CELERY_BROKER_URL = "amqp://devops:devops123@127.0.0.1:5672/alarm"
CELERY_RESULT_BACKEND = "amqp://devops:devops123@127.0.0.1:5672/alarm"# 使用django_celery_beat动态配置任务
CELERY_BEAT_SCHEDULER = 'django_celery_beat.schedulers:DatabaseScheduler'# celery序列化和反序列化配置
CELERY_TASK_SERIALIZER = 'pickle'
CELERY_RESULT_SERIALIZER = 'pickle'
CELERY_ACCEPT_CONTENT = ['pickle', 'json']# 下面配置项在有些情况下可以防止死锁  非常重要!
CELERYD_FORCE_EXECV = True
# 任务结果存储的过期时间,默认1天过期。如果beat开启,Celery每天会自动清除,设为0,存储结果永不过期。
CELERY_RESULT_EXPIRES = 60 * 60 * 24
# 每个worker执行1000次任务后死掉,会自动重启worker,防止任务占用太多内存导致内存泄漏
CELERY_MAX_TASKS_PER_CHILD = 1000
# 禁用所有速度限制,如果网络资源有限,不建议开足马力。
CELERY_DISABLE_RATE_LIMITS = True
# 单个任务的运行时间限制,否则会被杀死
CELERYD_TASK_TIME_LIMIT = 60 * 60
CELERY_TASK_RETRY = 2# celery 队列配置
from kombu import Exchange, Queue
# consumer_arguments设置队列的优先级
CELERY_TASK_QUEUES = (Queue('alarm_queue', Exchange('alarm_exchange'), routing_key='alarm_email', consumer_arguments={'x-priority': 5}),Queue('alarm_queue', Exchange('alarm_exchange'), routing_key='alarm_phone', consumer_arguments={'x-priority': 8}),Queue('calcu_queue', Exchange('calcu_exchange'), routing_key='calcu_feature', consumer_arguments={'x-priority': 10})
)CELERY_TASK_ROUTES = {'alarm.tasks.call_phone': {'queue': 'alarm_queue', 'routing_key': 'alarm_phone'},'alarm.tasks.send_email': {'queue': 'alarm_queue', 'routing_key': 'alarm_email'},'calcu.tasks.execute_calcu': {'queue': 'calcu_queue', 'routing_key': 'calcu_feature'},
}

settings.py 同级目录下,新增一个 celery.py 文件:

from __future__ import absolute_import, unicode_literals    # absolute_import: 使用python的库而不是项目目录下的文件
import os
from celery import Celery
from django.conf import settingsos.environ.setdefault("DJANGO_SETTINGS_MODULE", "celerymq.settings")
celery_app = Celery("celerymq")celery_app.config_from_object("django.conf:settings", namespace="CELERY")
celery_app.autodiscover_tasks(settings.INSTALLED_APPS)

在App中增加一个 tasks.py 文件,用于实现异步任务:

import time
from celery import shared_task@shared_task(ignore_result=True)
def execute_calcu(dataframe):print(f'execute celery task: calcu feature')time.sleep(10)		# 这里写比较耗时的逻辑print(f'execute calcu feature task run over')

在其他文件逻辑中进行异步调用:

execute_calcu.delay(dataframe)

项目启动后,如果有异步任务进来,可以在 RabbitMQ 监控平台看到队列信息: http://127.0.0.1:15672/
在这里插入图片描述

3. 启动命令

启动 worker 去消费数据。

# 启动worker
celery worker -A celerymq -l INFO -n alarm_queue -Q alarm_queue -P eventlet# 启动beat
celery beat -A celerymq -l INFO --scheduler django_celery_beat.schedulers:DatabaseScheduler

在linux中启动worker的话,可以去掉 -P eventlet 参数。

另外,定时任务推荐使用 django-admin 来下发。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/266227.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Linux系统---基于Pipe实现一个简单Client-Server system

顾得泉:个人主页 个人专栏:《Linux操作系统》 《C/C》 《LeedCode刷题》 键盘敲烂,年薪百万! 一、题目要求 Server是一个服务器进程,只能进行整数平方运算。Client要计算一个整数的平方的平方的平方,即…

数字人对话系统 Linly-Talker

🔥🔥🔥数字人对话系统 Linly-Talker🔥🔥🔥 English 简体中文 欢迎大家star我的仓库 https://github.com/Kedreamix/Linly-Talker 2023.12 更新 📆 用户可以上传任意图片进行对话 介绍 Lin…

单目相机测距(3米范围内)二维码实现方案(python代码 仅仅依赖opencv)

总体思路:先通过opencv 识别二维码的的四个像素角位置,然后把二维码的物理位置设置为 cv::Point3f(-HALF_LENGTH, -HALF_LENGTH, 0), //tl cv::Point3f(HALF_LENGTH, -HALF_LENGTH, 0), //tr cv::Point3f(HALF_LENGTH, HALF_LENGTH, 0), //br cv::P…

django与数据库交互关于当前时间的坑

背景 在线上服务中使用时间进行数据库操作时发现异常,而在本地环境无法成功复现此问题,导致难以进行故障排查。 核心问题 view.py class XxxViewSet(viewsets.ModelViewSet):queryset Xxx.objects.with_status().order_by("status", &quo…

【Jeecg Boot 3 - 第二天】1.1、后端 docker-compose 部署 JEECGBOOT3

一、场景 二、实战 ▶ 2.1 修改配置文件 > 目的一:将 dev 变更为生产环境 prod > 目的二:方便spring项目调用docker同个network下的redis和mysql ▶ 2.2 编写dockerfile ▶ 2.3 编写docker-compose.yaml ▶ 2.4 打…

参数占位符#{}和${}

#是预处理而$是直接替换 Mybatis在处理#{}时,会将SQL中的#{}替换成占位符?,再使用preparedStatement的set方法来赋值。而Mybatis在处理 时,是将 {}时,是将 时,是将{}直接替换成变量的值 我们分别使用#{}和…

外包干了2年,技术退步明显...

📢专注于分享软件测试干货内容,欢迎点赞 👍 收藏 ⭐留言 📝 如有错误敬请指正!📢交流讨论:欢迎加入我们一起学习!📢资源分享:耗时200小时精选的「软件测试」资…

[MySQL]SQL优化之索引的使用规则

🌈键盘敲烂,年薪30万🌈 目录 一、索引失效 📕最左前缀法则 📕范围查询> 📕索引列运算,索引失效 📕前模糊匹配 📕or连接的条件 📕字符串类型不加 …

【Spring】@SpringBootApplication注解解析

前言: 当我们第一次创建一个springboot工程时,我们会对启动类(xxxApplication)有许多困惑,为什么只要运行启动类我们在项目中自定义的bean无需配置类配置,扫描就能自动注入到IOC容器中?为什么我…

数据结构与算法-Rust 版读书笔记-2线性数据结构-双端队列

数据结构与算法-Rust 版读书笔记-2线性数据结构-双端队列 1、双端队列 deque又称为双端队列,双端队列是与队列类似的项的有序集合。deque有两个端部:首端和尾端。deque不同于队列的地方就在于项的添加和删除是不受限制的,既可以从首尾两端添…

基于JavaWeb+SpringBoot+Vue在线拍卖系统的设计和实现

基于JavaWebSpringBootVue在线拍卖系统系统的设计和实现 源码获取入口Lun文目录前言主要技术系统设计功能截图订阅经典源码专栏Java项目精品实战案例《500套》 源码获取 源码获取入口 Lun文目录 摘 要 1 Abstract 1 1 系统概述 4 1.1 概述 4 1.2课题意义 4 1.3 主要内容 4 2 …

二叉树题目:在受污染的二叉树中查找元素

文章目录 题目标题和出处难度题目描述要求示例数据范围 前言解法一思路和算法代码复杂度分析 解法二思路和算法代码复杂度分析 题目 标题和出处 标题:在受污染的二叉树中查找元素 出处:1261. 在受污染的二叉树中查找元素 难度 5 级 题目描述 要求…