Flask框架配置celery-[1]:flask工厂模式集成使用celery,可在异步任务中使用flask应用上下文,即拿即用,无需更多配置

一、概述

1、celery框架和flask框架在运行时,是在不同的进程中,资源是独占的。

2、celery异步任务如果想使用flask中的功能,如orm,是需要在flask应用上下文管理器中执行orm操作的

3、使用celery是需要使用到中间件的,简单点就使用redis做中间件

注意:

在flask工厂模式集成celery异步框架,在celery的异步任务中能够获取到flask的应用上下文管理器,也就是说在celery异步任务中你可以去调用flask项目中功能,如orm操作等。

使用本文配置,可以无需修改flask创建app应用的程序,直接将celery相关包创建,运行就可以使用,且能够在异步任务使用flask的功能。

二、项目结构

依赖环境:

celery==4.4.7
eventlet==0.33.3
Flask==2.1.3
Flask-Caching==1.10.1
Flask-Cors==3.0.10
Flask-Migrate==2.7.0
Flask-RESTful==0.3.9
Flask-SocketIO==5.1.1
Flask-SQLAlchemy==2.5.1
PyMySQL==1.0.2
redis==3.5.3
SQLAlchemy==1.4.0
Werkzeug==2.0.2

目录结构:

flask-project

        |--apps

                |-- user

                        |-- models

                        |--views.py

                        |--urls.py

                |--__init__.py

        |--ext

                |--__init__.py

                |--config.py

        |--celery_task

                |--__init__.py

                |--async_task.py

                |--celery.py

                |--celeryconfig.py

                |--check_task.py

                |--scheduler_task.py

        app.py

三、flask工厂模式下各模块功能

1、apps/user/models.py : 写了一个user表

2、apps/user/views.py:写了测试调用celery异步任务的接口

3、apps/user/urls.py: 注册路由的

4、ext/__init__.py:cache、db、cors的拓展

5、ext/config.py : cache和cors使用到的配置

6、apps/__init__.py: 一个函数create_app,生成flask应用对象

7、app.py: 启动flask应用对象的模块

本文重点不在flask工厂模式,默认看官都懂如何创建flaks工厂模式的项目了。

在视图中在执行异步任务,并获取异步任务的id:

from celery_task.async_task import send_email_task,cache_user_task
#用户资源:get\put\delete, 对单个进行操作
class UserOneResource(ResourceBase):def put(self,id):#测试异步发邮件email = request.args.get('email')code = request.args.get('code')res = send_email_task.delay(email,code)print(res.id)return NewResponse(msg='put',data={'task_id':res.id})def patch(self,id):#测试异步操作flask的orm和cachep = request.args.get('p')if p=='set':res = cache_user_task.delay()print(res,type(res))return NewResponse(msg='patch',data={'task_id':res.id})else:from ext import cachedata = cache.get('all-user-data')return NewResponse(msg='patch',data=data)

res = 异步函数.delay(函数需要的参数)

task_id = res.id

注意:task_id 可以知道对应的任务的完成情况,获取任务的返回值等。

四、celery项目的配置

1、celery的配置

将celery的配置都放到一个py文件中,方便后期的维护和使用

celeryconfig.py

from celery.schedules import crontab
from datetime import timedelta
'''
参数解析:
accept_content:允许的内容类型/序列化程序的白名单,如果收到不在此列表中的消息,则该消息将被丢弃并出现错误,默认只为json;
task_serializer:标识要使用的默认序列化方法的字符串,默认值为json;
result_serializer:结果序列化格式,默认值为json;
timezone:配置Celery以使用自定义时区;
enable_utc:启用消息中的日期和时间,将转换为使用 UTC 时区,与timezone连用,当设置为 false 时,将使用系统本地时区。
result_expires: 异步任务结果存活时长
beat_schedule:设置定时任务
'''
#手动注册celery的异步任务:将所有celery异步任务所在的模块找到,写成字符串
task_module = ['celery_task.async_task',  # 写任务模块导入路径,该模块主要写异步任务的方法'celery_task.scheduler_task',  # 写任务模块导入路径,该模块主要写定时任务的方法
]#celery的配置
config = {"broker_url" :'redis://127.0.0.1:6379/0',   #'redis://:123456@127.0.0.1:6379/1' 有密码时,123456是密码"result_backend" : 'redis://127.0.0.1:6379/1',"task_serializer" : 'json',"result_serializer" : 'json',"accept_content" : ['json'],"timezone" : 'Asia/Shanghai',"enable_utc" : False,"result_expires" : 1*60*60,"beat_schedule" : { #定时任务配置# 名字随意命名'add-func-30-seconds': {# 执行add_task下的addy函数'task': 'celery_task.scheduler_task.add_func',  # 任务函数的导入路径,from celery_task.scheduler_task import add_func# 每10秒执行一次'schedule': timedelta(seconds=30),# add函数传递的参数'args': (10, 21)},# 名字随意起'add-func-5-minutes': {'task': 'celery_task.scheduler_task.add_func',  # 任务函数的导入路径,from celery_task.scheduler_task import add_func# crontab不传的参数默认就是每的意思,比如这里是每年每月每日每天每小时的5分执行该任务'schedule': crontab(minute='5'),  # 之前时间点执行,每小时的第5分钟执行任务, 改成小时,分钟,秒 就是每天的哪个小时哪分钟哪秒钟执行'args': (19, 22)  # 定时任务需要的参数},# 缓存用户数据到cache中'cache-user-func': {'task': 'celery_task.scheduler_task.cache_user_func',# 导入任务函数:from celery_task.scheduler_task import cache_user_func'schedule': timedelta(minutes=1),  # 每1分钟执行一次,将用户消息缓存到cache中}}
}

2、创建celery对象

celery.py

from celery import Celery,Task
from .celeryconfig import config,task_module
import sys
import os
'1、把flask项目路径添加到系统环境变量中'
project_path = os.path.dirname(os.path.dirname(__file__))
sys.path.append(project_path)'''
2、创建celery应用对象'task'可以任务是该celery对象名字,用于区分celery对象broker是指定消息中间件backend是指定任务结果存储位置include是手动指定异步任务所在的模块的位置
'''
#创建celery异步对象
celery = Celery('task', broker=config.get('broker_url'), backend=config.get('result_backend'), include=task_module)
#导入一些基本配置
celery.conf.update(**config)'3、给celery所有任务添加flask的应用上下文,在celery异步任务中就可以调用flask中的对象了'
class ContextTask(celery.Task):def __call__(self, *args, **kwargs):from apps import create_appapp = create_app()with app.app_context():return self.run(*args, **kwargs)
celery.Task = ContextTask

注意:

1、第一步很关键,设置到python项目运行时,加载环境变量的问题。这一步是将flask项目的根目录加载环境变量中,这样第3步才能从apps中导入create_app函数。

2、第二步是创建celery通用的方法了,没什么好说的。

3、第三步很关键,涉及到celery异步任务能否在flask应用上下文管理器运行,从而可以调用flask中的功能,例如orm操作,cache操作.。(在执行任务时,先套上flask的应用上下文管理器)

3、异步任务模块

将所有异步任务相关的函数都集中到一个模块中,方便维护和使用。

async_task.py

# 导入celery对象app
from celery_task.celery import celery
from ext import cache
import time'''
1、没有返回值的,@app.task(ignore_result=True)
2、有返回值的任务,@app.task 默认就是(ignore_result=False)
'''# 没有返回值,禁用掉结果后端
@celery.task
def send_email_task(receiver_email,code):  # 此时可以直接传邮箱,还能减少一次数据库的IO操作''':param email: 接收消息的邮箱,用户的邮箱:return:'''# 模拟邮件发送验证码time.sleep(5)return {'result':'邮件已经发送',receiver_email:'2356'}@celery.task
def cache_user_task():#orm查询数据,放到cache中from apps.user.models import UserModeluser = UserModel.query.all()lis = []for u in user:id = u.idname = u.namedic = {'id':id,'name':name}lis.append(dic)print(dic)cache.set('all-user-data',lis)return {'code':200,'msg':'查询数据成功'}

4、定时任务模块

将所有定时任务相关的函数都集中到一个模块中,方便维护和使用。

schedulser_task.py

from celery_task.celery import celery
import time# 有返回值,返回值可以从结果后端中获取
@celery.task
def add_func(a, b):print('执行了加法函数',a+b)return a + b# 不需要返回值,禁用掉结果后端
@celery.task(ignore_result=True)
def cache_user_func():print('all')

5、检测任务id获取任务状态和返回值

check_task.py:

from celery.result import AsyncResult
from celery_task.celery import celery'''验证任务的执行状态的'''def check_task_status(task_id):'''任务的执行状态:PENDING :等待执行STARTED :开始执行RETRY   :重新尝试执行SUCCESS :执行成功FAILURE :执行失败:param task_id::return:'''result = AsyncResult(id=task_id, app=celery)dic = {'type': result.status,'msg': '','data': None,'code': 400}if result.status == 'PENDING':dic['msg'] = '任务等待中'elif result.status == 'STARTED':dic['msg'] = '任务开始执行'elif result.status == 'RETRY':dic['msg'] = '任务重新尝试执行'elif result.status == 'FAILURE':dic['msg'] = '任务执行失败了'elif result.status == 'SUCCESS':result = result.get()dic['msg'] = '任务执行成功'dic['data'] = resultdic['code'] = 200# result.forget() # 将结果删除# async.revoke(terminate=True)  # 无论现在是什么时候,都要终止# async.revoke(terminate=False) # 如果任务还没有开始执行呢,那么就可以终止。return dic

在视图函数中调用该方法,通过task_id ,返回任务的运行结果。

五、测试

1、运行项目

flask项目(在项目根目录下执行):

        flask run --host 0.0.0.0 --port 5000

celery项目(在项目根目录下执行):

启动celery进程:

windows系统:

        celery -A celery_task.celery worker -l info  -P  eventlet

linux系统:

        celery -A celery_task.celery worker -l info 

启动定时任务(先启动celery进程在启动定时任务):

celery -A celery_task.celery beat -l info

2、运行结果

1、执行异步任务中,将orm数据存到cache中

2、执行定时任务了

六、注意事项

1、在系统中要先安装好redis和mysql,并都启动了

2、在测试异步操作orm时,会使用到flask的cache存数据,注意flask的cache不能配置内存模式,不然celery进程存到cache中的数据,flask进程中取不到的。

3、当前的配置下,celery的目录必须是在flask根目录下

七、拓展-改变celery_task的位置

如果你想将celery_task包移动到apps包下,此时你需要修改什么?

1、apps/celery_task/celery.py:将flask项目根目录加载到系统环境变量中的路径有变

'1、把flask项目路径添加到系统环境变量中'
project_path = os.path.dirname(os.path.dirname(os.path.dirname(__file__)))

2、apps/celery_task/celeryconfig.py: 注册异步任务的模块,定时任务的模块的位置变化

'1、加上apps.'
task_module = ['apps.celery_task.async_task',  # 写任务模块导入路径,该模块主要写异步任务的方法'apps.celery_task.scheduler_task',  # 写任务模块导入路径,该模块主要写定时任务的方法
]'2、task参数对应的字符串,加上apps.'
config = {"broker_url" :'redis://127.0.0.1:6379/0',   #'redis://:123456@127.0.0.1:6379/1' 有密码时,123456是密码"result_backend" : 'redis://127.0.0.1:6379/1',"task_serializer" : 'json',"result_serializer" : 'json',"accept_content" : ['json'],"timezone" : 'Asia/Shanghai',"enable_utc" : False,"result_expires" : 1*60*60,"beat_schedule" : { #定时任务配置# 名字随意命名'add-func-30-seconds': {# 执行add_task下的addy函数'task': 'apps.celery_task.scheduler_task.add_func',  # 任务函数的导入路径,from celery_task.scheduler_task import add_func# 每10秒执行一次'schedule': timedelta(seconds=30),# add函数传递的参数'args': (10, 21)},# 名字随意起'add-func-5-minutes': {'task': 'apps.celery_task.scheduler_task.add_func',  # 任务函数的导入路径,from celery_task.scheduler_task import add_func# crontab不传的参数默认就是每的意思,比如这里是每年每月每日每天每小时的5分执行该任务'schedule': crontab(minute='5'),  # 之前时间点执行,每小时的第5分钟执行任务, 改成小时,分钟,秒 就是每天的哪个小时哪分钟哪秒钟执行'args': (19, 22)  # 定时任务需要的参数},# 缓存用户数据到cache中'cache-user-func': {'task': 'apps.celery_task.scheduler_task.cache_user_func',# 导入任务函数:from celery_task.scheduler_task import cache_user_func'schedule': timedelta(minutes=1),  # 每1分钟执行一次,将用户消息缓存到cache中}}
}

3、在视图函数导入异步任务的路径也变了

#异步任务
from apps.celery_task.async_task import send_email_task,cache_user_task

4、启动celery和定时任务的命令变量【在项目根目录下执行命令】

启动celery:

windows启动命令: celery  -A  apps.celery_task.celery worker -l info  -P  eventlet

linux启动命令: celery  -A  apps.celery_task.celery worker -l info 

启动定时任务:

celery -A apps.celery_task beat -l info

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/133249.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

go mod 使用三方包、go get命令

一、环境变量设置 go env -w GO111MODULEon go env -w GOPROXYhttps://goproxy.cn,https://goproxy.io,direct 二、goland开启 go mod 三、go mod 使用 在go.mod文件中声明三方包地址&版本号即可,如下: 开发工具goland会自动解析go.mod文件&#x…

股票价格预测 | Python基于RNN及股票预测实战

循环神经网络(RNN)是基于序列数据(如语言、语音、时间序列)的递归性质而设计的,是一种反馈类型的神经网络,其结构包含环和自重复,因此被称为“循环”。它专门用于处理序列数据,如逐字生成文本或预测时间序列数据(例如股票价格)。 (1)one to one:其实和全连接神经网络…

基于黏菌优化的BP神经网络(分类应用) - 附代码

基于黏菌优化的BP神经网络(分类应用) - 附代码 文章目录 基于黏菌优化的BP神经网络(分类应用) - 附代码1.鸢尾花iris数据介绍2.数据集整理3.黏菌优化BP神经网络3.1 BP神经网络参数设置3.2 黏菌算法应用 4.测试结果:5.M…

云计算到底牛x在哪里?

你们好,我的网工朋友。 云计算已经霸屏行业有段时间了,但很多粉丝朋友还是不太明白什么是云计算,为什么要学云计算。 从宏观来说,其实云计算的优点很多。 就和传统模式相比,云计算在六个维度都有显著的提升点。 比…

C++算法:图中的最短环

题目 现有一个含 n 个顶点的 双向 图,每个顶点按从 0 到 n - 1 标记。图中的边由二维整数数组 edges 表示,其中 edges[i] [ui, vi] 表示顶点 ui 和 vi 之间存在一条边。每对顶点最多通过一条边连接,并且不存在与自身相连的顶点。 返回图中 …

ios safari 浏览器跳转页面没有自适应

今天开发遇到了一个问题,当用户点击浏览器中的表单进行注册时,表单元素会放大,随后跳转页面无法还原到初始状态。 这是因为如果 的 font-size 被设定为 16px 或更大,那么 iOS 上的 Safari 将正常聚焦到输入表单中。但是&#xff…

卷积神经网络CNN基础知识

目录 1 前言2 卷积神经网络CNN2.1 LeNet-5相关介绍2.2 CNN基本结构2.2.1 卷积层2.2.2 池化层(下采样层)2.2.3 全连接层2.2.3.1激励层(非线性激活)2.2.3.2 线性层2.2.3.3 Dropout层2.2.3.4 总结 2.3 图像的上采样和下采样2.3.1 上采…

Idea报错 java: 程序包org.springframework.boot不存在 解决方法

发现我的是因为更改了maven的主路径和本地仓库路径,但是新建了一个工程后,设置就恢复默认了。需要重新设置正确路径。 应用后会重新下载依赖项 之后虽然还会报错,但是已经不影响项目运行,配置成功

uCOSIII实时操作系统 六 内部任务(空闲函数)

目录 空闲任务: 时钟节拍任务: 统计任务: 定时任务: 中断服务管理任务: 钩子函数: 空闲任务的钩子函数: 空闲任务钩子函数实验: 其他任务的钩子函数: 空闲任务&a…

Hadoop-2.5.2平台环境搭建遇到的问题

文章目录 一、集群环境二、MySQL2.1 MySQL初始化失败2.2 MySQL启动报错2.3 启动时报不能打开日志错2.4 mysql启动时pid报错 二、Hive2.1 Hive修改core-site.xml文件后刷新权限2.2 Hive启动元数据时报错2.3 Hive初始化MySQL报错2.3.1 报错信息2.3.2 错误原因2.3.3 参考文档 2.4 …

uni-app:本地缓存的使用

uni-app 提供了多种方法用于本地缓存的操作。下面是一些常用的 uni-app 本地缓存方法: uni.setStorageSync(key, data): 同步方式将数据存储到本地缓存中,可以使用对应的 key 来获取该数据。 uni.setStorage({key, data}): 异步方式将数据存储到本地缓存…

The Foundry Nuke 15视频后期合成和特效制作Mac软件

Nuke 15 是一款专业的合成软件,主要用于电影、电视和广告制作中的后期合成和特效制作。 Nuke 15 提供了强大的合成工具和功能,可以对多个图像、视频和3D元素进行无缝融合和合成。它支持多通道图像处理,能够处理高动态范围(HDR&…