Django框架-使用celery(一):django使用celery的通用配置,不受版本影响

目录

一、依赖包情况

二、项目目录结构

   2.1、怎么将django的应用创建到apps包

三、celery的配置

2.1、celery_task/celery.py

2.2、celery_task/async_task.py

2.3、celery_task/scheduler_task.py

2.4、utils/check_task.py

四、apps/user中配置相关处理视图

4.1、基本配置

4.2、user的models

4.3、user的视图函数

五、调用函数测试

5.1、启动项目

5.2、测试项目:Postman接口工具

六、报错


一、依赖包情况

python==3.9.0

django==3.2.0

celery==5.3.1

django-redis==5.3.0

eventlet==0.33.3  #windows系统需要使用到

注意:还需要在系统中安装好redis数据库,不然无法使用。

二、项目目录结构

                 

   2.1、怎么将django的应用创建到apps包

·1、创建user应用

cd apps
python ../manage.py startapp user

   2、修改user包下的apps.py模块

from django.apps import AppConfigclass UserConfig(AppConfig):default_auto_field = 'django.db.models.BigAutoField'#原来是 name = 'user',改成下面的name = 'apps.user'

3、注册到settings.py文件中

INSTALLED_APPS = ['django.contrib.admin','django.contrib.auth','django.contrib.contenttypes','django.contrib.sessions','django.contrib.messages','django.contrib.staticfiles','apps.user.apps.UserConfig', #注册user应用,from apps.user.apps import UserConfig 
]

三、celery的配置

概述:celery应用程序和任务都放到celery_task包中,其中celery.py是创建Celery的实例对象的,async_task.py用来写异步任务,scheduler_task.py用来写定时任务的。utils/check_task.py用来检测任务id是否结束并获取任务的返回值的。

2.1、celery_task/celery.py

from celery import Celery
from celery.schedules import crontab
from datetime import timedelta# 消息中间件,密码是你redis的密码
# broker='redis://:123456@127.0.0.1:6379/2' 密码123456
broker = 'redis://127.0.0.1:6379/0'  # 无密码# 任务结果存储
backend = 'redis://127.0.0.1:6379/1'#包含任务的所有模块的导入路径:
task_module = ['celery_task.async_task', #写任务模块导入路径,该模块主要写异步任务的方法'celery_task.scheduler_task', #写任务模块导入路径,该模块主要写定时任务的方法
]# 生成celery对象,'task'相当于key,用于区分celery对象
# broker是指定消息处理,backend是指定结果后端的存储位置 include参数需要指定任务模块
app = Celery('task', broker=broker, backend=backend, include=task_module)# 配置
# 时区
app.conf.timezone = 'Asia/Shanghai'
# 是否使用UTC
app.conf.enable_utc = False
# 定时任务配置
app.conf.beat_schedule = {# 名字随意命名'add-func-30-seconds': {# 执行add_task下的addy函数'task': 'celery_task.scheduler_task.add_func',  # 任务函数的导入路径,from celery_task.scheduler_task import add_func# 每10秒执行一次'schedule': timedelta(seconds=30),# add函数传递的参数'args': (10, 21)},#名字随意起'add-func-5-minutes': {'task': 'celery_task.scheduler_task.add_func',# 任务函数的导入路径,from celery_task.scheduler_task import add_func# crontab不传的参数默认就是每的意思,比如这里是每年每月每日每天每小时的5分执行该任务'schedule': crontab(minute='5'),  # 之前时间点执行,每小时的第5分钟执行任务, 改成小时,分钟,秒 就是每天的哪个小时哪分钟哪秒钟执行'args': (19, 22)  #定时任务需要的参数},#缓存用户数据到cache中'cache-user-func':{'task':'celery_task.scheduler_task.cache_user_func',#导入任务函数:from celery_task.scheduler_task import cache_user_func'schedule':timedelta(minutes=1),#每1分钟执行一次,将用户消息缓存到cache中}
}'''
配置:也可以使用下面这种方式:
app.conf.update(task_serializer='json',accept_content=['json'],  # Ignore other contentresult_serializer='json',timezone='Asia/Shanghai',enable_utc=False,
)
'''

2.2、celery_task/async_task.py

# 因为需要用到django中的内容,所以需要配置django环境
import os
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "study_celery.settings")
import django
django.setup()# 导入celery对象app
from celery_task.celery import app
# 导入django自带的发送邮件模块
from django.core.mail import send_mail
import threading
from study_celery import settings
'''
1、没有返回值的,@app.task(ignore_result=True)
2、有返回值的任务,@app.task
'''#没有返回值,禁用掉结果后端
@app.task
def send_email_task(email,code):  # 此时可以直接传邮箱,还能减少一次数据库的IO操作''':param email: 接收消息的邮箱,用户的邮箱:return:'''# 启用线程发送邮件,此处最好加线程池t = threading.Thread(target=send_mail,args=("登录前获取的验证码",  # 邮件标题'点击该邮件激活你的账号,否则无法登陆',  # 给html_message参数传值后,该参数信息失效settings.EMAIL_HOST_USER,  # 用于发送邮件的邮箱地址[email],  # 接收邮件的邮件地址,可以写多个),# html_message中定义的字符串即HTML格式的信息,可以在一个html文件中写好复制出来放在该字符串中kwargs={'html_message': f"<p></p> <p>验证码:{code}</p>"})t.start()return {'email':email,'code':code}

2.3、celery_task/scheduler_task.py

# 因为需要用到django中的内容,所以需要配置django环境
import os
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "study_celery.settings")
import django
django.setup()from celery_task.celery import app
from apps.user.views import models as user_models
from django.core.cache import cache
import time
from django.forms import model_to_dict#有返回值,返回值可以从结果后端中获取
@app.task
def add_func(a,b):print('执行了加法函数')cache.set('add_ret',{'time':time.strftime('%Y-%m-%d %H:%M:%S'),'ret':a+b})return a+b#不需要返回值,禁用掉结果后端
@app.task(ignore_result=True)
def cache_user_func():user = user_models.UserModel.objects.all()user_dict = {}for obj in user:user_dict[obj.account] = model_to_dict(obj)cache.set('all-user-data',user_dict,timeout=35*60)

2.4、utils/check_task.py

from celery.result import AsyncResult
from celery_task.celery import app
'''验证任务的执行状态的'''def check_task_status(task_id):'''任务的执行状态:PENDING :等待执行STARTED :开始执行RETRY   :重新尝试执行SUCCESS :执行成功FAILURE :执行失败:param task_id::return:'''result = AsyncResult(id=task_id, app=app)dic = {'type':result.status,'msg':'','data':'','code':400}if result.status == 'PENDING':dic['msg'] = '任务等待中'elif result.status == 'STARTED':dic['msg'] = '任务开始执行'elif result.status == 'RETRY':dic['msg']='任务重新尝试执行'elif result.status =='FAILURE':dic['msg'] = '任务执行失败了'elif result.status == 'SUCCESS':result = result.get()dic['msg'] = '任务执行成功'dic['data'] = resultdic['code'] = 200# result.forget() # 将结果删除# async.revoke(terminate=True)  # 无论现在是什么时候,都要终止# async.revoke(terminate=False) # 如果任务还没有开始执行呢,那么就可以终止。return dic

四、apps/user中配置相关处理视图

4.1、基本配置

1、study_celery/settings.py

#cache缓存
CACHES = {"default": {"BACKEND": "django_redis.cache.RedisCache","LOCATION": "redis://127.0.0.1:6379","OPTIONS": {"CLIENT_CLASS": "django_redis.client.DefaultClient","CONNECTION_POOL_KWARGS": {"max_connections": 100}# "PASSWORD": "123",},'TIMEOUT':30*60 #缓存过期时间}
}#邮件配置
# EMAIL_BACKEND = 'django.core.mail.backends.smtp.EmailBackend'
EMAIL_HOST = 'smtp.qq.com'  # 如果是 163 改成 smtp.163.com
EMAIL_PORT = 465
EMAIL_HOST_USER = '2414155342@qq.com'  # 发送邮件的邮箱帐号
EMAIL_HOST_PASSWORD = 'qq邮箱的授权码'  # 授权码,各邮箱的设置中启用smtp服务时获取
DEFAULT_FROM_EMAIL = EMAIL_HOST_USER
# 这样收到的邮件,收件人处就会这样显示
# DEFAULT_FROM_EMAIL = '<'xxxxx@qq.com>'
EMAIL_USE_SSL = True   # 使用ssl
# EMAIL_USE_TLS = False # 使用tls
# EMAIL_USE_SSL 和 EMAIL_USE_TLS 是互斥的,即只能有一个为 True

2、study_celery/urls.py

from django.contrib import admin
from django.urls import path,includeurlpatterns = [path('admin/', admin.site.urls),path('user/',include('apps.user.urls'))
]

3、apps/user/urls.py

from django.urls import path
from . import viewsurlpatterns = []

4.2、user的models

apps/user/models.py

from django.db import models
# Create your models here.class UserModel(models.Model):id = models.AutoField(primary_key=True)name = models.CharField(max_length=32)account = models.CharField(max_length=64)password = models.CharField(max_length=256)email = models.EmailField()

执行数据库迁移命令

python manage.py makemigrations

python manage.py migrate

4.3、user的视图函数

1、apps/user/views.py

from django.contrib.auth.hashers import make_password, check_password
from django.views import View
from django.http import JsonResponse
from . import models
from django.core.cache import cache
import time
from celery_task.async_task import send_email_task
# Create your views here.class ResgiterView(View):#注册用户def post(self,request):name = request.POST.get('name')account = request.POST.get('account')password = request.POST.get('password')email = request.POST.get('email')obj = models.UserModel.objects.filter(account=account).first()if obj:return JsonResponse({'code':400,'msg':'账户已经存在了'})password = make_password(password)instance = models.UserModel.objects.create(name=name,account=account,password=password,email=email)return JsonResponse({'code':200,'msg':'注册用户成功'})class LoginView(View):#用户登录def post(self,request):account = request.POST.get('account')password = request.POST.get('password')code = request.POST.get('code') #验证码email_code = cache.get(f'email_{account}')#发给邮箱的验证码.get(f'email_{account})print(code,email_code)if code and email_code:if code != email_code:return JsonResponse({'code':400,'msg':'验证码错误'})else:return JsonResponse({'code':400,'msg':'请先点击发送邮件获取验证码'})obj = models.UserModel.objects.filter(account=account).first()if not obj:return JsonResponse({'code':400,'msg':'当前用户不存在'})pwd_true = check_password(password,obj.password)response = JsonResponse({'code':200,'msg':'登录成功'})if pwd_true:return responseelse:return JsonResponse({'code':400,'msg':'用户名或密码错误'})class LoginSendEmailView(View):#用户登录前,需要验证码,发送验证码给用户的邮箱def post(self,request):account = request.POST.get('account')email = request.POST.get('email')code = str(time.time())[-5:]cache.set(f'email_{account}',code)print(cache.get(f'email_{account}'))res = send_email_task.delay(email,code)task_id = res.idprint('验证码是',code)return JsonResponse({'code':200,'msg':f'请查看{email}邮箱中是否收到邮件','task_id':task_id})class AllUserDataView(View):#查询cache_user_func定时任务执行时存到cache中的用户数据def get(self,request):key = 'all-user-data'data = cache.get(key)if data:return JsonResponse({'code':200,'data':data})else:return JsonResponse({'code':400,'msg':'没有相关数据'})class AddFuncDataView(View):#查询add_func 定时任务执行时存到cache中的数据def get(self, request):data = cache.get('add_ret')print(data, type(data))return JsonResponse({'code': 200, 'data': data})class UserTaskIdGetDataView(View):def get(self,request):from utils.check_task import check_task_status#检查任务是否成功了,获取任务的返回值task_id = request.GET.get('task_id')if not task_id:return JsonResponse({'code':400,'msg':'没有携带任务id'})ret = check_task_status(task_id)return JsonResponse(ret)

2、apps/user/urls.py

from django.urls import path
from . import viewsurlpatterns = [path('login/',views.LoginView.as_view(),name='user-login'),#登录path('register/', views.ResgiterView.as_view(), name='user-register'),#注册path('login/code/',views.LoginSendEmailView.as_view(),name='user-login-send-email'),#登录验证码path('all/user/data/',views.AllUserDataView.as_view(),name='user-all-user-data'),#获取定时任务cache_user_func缓存到cache中的用户数据path('add/result/',views.AddFuncDataView.as_view(),name='user-add-result'),#获取定时任务add_func缓存到cache的计算结果path('task-id/result/',views.UserTaskIdGetDataView.as_view(),name='user-task-id-data'),#通过任务的task-id获取到任务返回值
]

五、调用函数测试

5.1、启动项目

1、启动django项目

python manage.py runserver

2、启动celery异步

#windows系统

celery -A celery_task worker -l info  -P  eventlet

#linux系统

celery -A celery_task worker -l info 

3、启动celery定时(定时任务也是提交给异步的)

celery -A celery_task beat -l info

5.2、测试项目:Postman接口工具

1、注册:url= /user/register/

2、登录前点击获取验证码:返回任务id, url=/user/login/code/

 把task_id=17ee8389-14ab-4da1-b88a-56446afb4493 复制下来,4中可以用到

3、登录:code是发送步骤2中发送给邮箱的验证码 ,url=/user/login/

4、通过任务id,获取到发送邮箱异步任务的返回值,url=/user/task-id/result/

复制2返回值中的task_id,获取该任务的返回值

5、获取定时任务中,add_func缓存在cache中的计算数据

6、获取定时任务中,cache_user_func缓存到cache中的用户数据

 总结:

1、异步任务,一般是保存文件、发送邮件、耗时操作等

2、定时任务,定时处理某些数据。

六、报错

1、先去celery.py中查看定时任务的配置,模块的路径是不是有问题,千万不要多了一个空格啥的

2、如果通过任务id获取任务的返回值不成功,看看是不是添加@app.task(ignore_result=True),如果是就去掉这个参数配置

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/66367.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

maven的入门使用

maven的入门使用 1.Maven&#xff08;Maven Apache&#xff09;是一个流行的项目构建和管理工具&#xff0c;2.项目结构和POM文件&#xff1a;3.POM文件&#xff08;Project Object Model&#xff09;4.依赖管理&#xff1a; 在POM文件中5.生命周期和构建过程1.前言2.插件系统3…

MySQL语句总和之MySQL数据库与表结构操作

目录 1、启动MySQL服务 2、进入MySQL数据库 3、退出数据库 4、查看MySQL数据库所有库 5、创建、删除、使用、查看所处库操作 6、创建表 7、查看表结构 8、表结构操作 1&#xff09;修改表名 2&#xff09;自增长操作 3)添加一个address字段放在Phone字段后面 4)添加…

c++ 有元

友元分为两部分内容 友元函数友元类 友元函数 问题&#xff1a;当我们尝试去重载operator<<&#xff0c;然后发现没办法将operator<<重载成成员函数。因为cout的输出流对象和隐含的this指针在抢占第一个参数的位置。this指针默认是第一个参数也就是左操作 数了。…

LeetCode 36题:有效的数独

题目 请你判断一个 9 x 9 的数独是否有效。只需要 根据以下规则 &#xff0c;验证已经填入的数字是否有效即可。 数字 1-9 在每一行只能出现一次。数字 1-9 在每一列只能出现一次。数字 1-9 在每一个以粗实线分隔的 3x3 宫内只能出现一次。&#xff08;请参考示例图&#xff…

【JavaEE进阶】Spring 更简单的读取和存储对象

文章目录 一. 存储Bean对象1. 配置扫描路径2. 添加注解存储 Bean 对象2.1 使用五大类注解存储Bean2.2 为什么要有五大类注解&#xff1f;2.3 有关获取Bean参数的命名规则 3. 使用方法注解储存 Bean 对象3.1 方法注解储存对象的用法3.2 Bean的重命名3.3 同⼀类型多个 Bean 报错 …

Vue前端 更具router.js 中的meta的roles实现路由卫士,实现权限判断。

参考了之篇文章 1、我在登陆时获取到登录用户的角色名roles&#xff0c;并存入sessionStorage中&#xff0c;具体是在login页面实现&#xff0c;还是在menu页面实现都可以。在menu页面实现&#xff0c;可以显得登陆快一些。 2、编写router.js&#xff0c;注意&#xff0c;一个…

Maven 基础之依赖管理、范围、传递、冲突

文章目录 关于依赖管理坐标和 mvnrepository 网站pom.xml 中"引"包 依赖范围依赖传递依赖冲突 关于依赖管理 坐标和 mvnrepository 网站 在 maven 中通过『坐标』概念来确定一个唯一确定的 jar 包。坐标的组成部分有&#xff1a; 元素说明<groupId>定义当前…

【C/C++】用return返回一个函数

2023年8月13日&#xff0c;周日早上 我的第一篇使用了动态图的博客 #include<iostream> #include<windows.h>int loop(){int i0;while(1){Sleep(1000);std::cout<<i<<std::endl;}return 1; }int main(){std::cout<<"程序开始"<<…

tp5中的事务处理

使用事务首先要数据库支持事务&#xff1b; 如下MySQL数据库user表开启事务支持&#xff0c;即设计表->引擎设置为InnoDB->保存 事务处理 1. 数据库的表引擎需要是 InnoDB 才可以使用&#xff0c;如果不是调整即可&#xff1b; 2. 事务处理&#xff0c;需要执行多个 SQ…

ORB-SLAM2学习笔记7之System主类和多线程

文章目录 0 引言1 整体框架1.1 整体流程 2 System主类2.1 成员函数2.2 成员变量 3 多线程3.1 ORB-SLAM2中的多线程3.2 加锁 0 引言 ORB-SLAM2是一种基于特征的视觉SLAM&#xff08;Simultaneous Localization and Mapping&#xff09;系统&#xff0c;它能够从单个、双目或RBG…

K8S系列二:实战入门

I. 配置kubectl 1.1 什么是kubectl&#xff1f; 官方文档中介绍kubectl是&#xff1a; Kubectl 是一个命令行接口&#xff0c;用于对 Kubernetes 集群运行命令。Kubectl的配置文件在$HOME/.kube目录。我们可以通过设置KUBECONFIG环境变量或设置命令参数–kubeconfig来指定其他…

优雅地处理RabbitMQ中的消息丢失

目录 一、异常处理 二、消息重试机制 三、错误日志记录 四、死信队列 五、监控与告警 优雅地处理RabbitMQ中的消息丢失对于构建可靠的消息系统至关重要。下面将介绍一些优雅处理消息丢失的方案&#xff0c;包括异常处理、重试机制、错误日志记录、死信队列和监控告警等。…