Celery分布式异步框架

Celery异步任务框架

"""

1)可以不依赖任何服务器,通过自身命令,启动服务(内部支持socket)

2)celery服务为为其他项目服务提供异步解决任务需求的 注:会有两个服务同时运行,一个是项目服务,一个是celery服务,项目服务将需要异步处理的任务交给celery服务,celery就会在需要时异步完成项目的需求

人是一个独立运行的服务 | 医院也是一个独立运行的服务 正常情况下,人可以完成所有健康情况的动作,不需要医院的参与;但当人生病时,就会被医院接收,解决人生病问题 人生病的处理方案交给医院来解决,所有人不生病时,医院独立运行,人生病时,医院就来解决人生病的需求

"""

 Celery架构

Celery的架构由三部分组成,消息中间件(message broker)、任务执行单元(worker)和 任务执行结果存储(task result store)组成。

消息中间件

Celery本身不提供消息服务,但是可以方便的和第三方提供的消息中间件集成。包括,RabbitMQ, Redis等等

任务执行单元

Worker是Celery提供的任务执行的单元,worker并发的运行在分布式的系统节点中。

任务结果存储

Task result store用来存储Worker执行的任务的结果,Celery支持以不同方式存储任务的结果,包括AMQP, redis等

使用场景

异步执行:解决耗时任务

延迟执行:解决延迟任务

定时执行:解决周期(周期)任务

Celery的安装配置

pip install celery

消息中间件:RabbitMQ/Redis

app=Celery('任务名', broker='xxx', backend='xxx')

两种celery任务结构:提倡用包管理,结构更清晰 

# 如果 Celery对象:Celery(...) 是放在一个模块下的 # 1)终端切换到该模块所在文件夹位置:scripts # 2)执行启动worker的命令:celery worker -A 模块名 -l info -P eventlet # 注:windows系统需要eventlet支持,Linux与MacOS直接执行:celery worker -A 模块名 -l info # 注:模块名随意 # 如果 Celery对象:Celery(...) 是放在一个包下的 # 1)必须在这个包下建一个celery.py的文件,将Celery(...)产生对象的语句放在该文件中 # 2)执行启动worker的命令:celery worker -A 包名 -l info -P eventlet # 注:windows系统需要eventlet支持,Linux与MacOS直接执行:celery worker -A 模块名 -l info # 注:包名随意

Celery执行异步任务

包架构封装

project├── celery_task  	# celery包│   ├── __init__.py # 包文件│   ├── celery.py   # celery连接和配置相关文件,且名字必须交celery.py│   └── tasks.py    # 所有任务函数├── add_task.py  	# 添加任务└── get_result.py   # 获取结果

基本使用

celery.py
# celery不支持win,所以想再win上运行,需要额外安装eventlet
windows系统需要eventlet支持:pip3 install eventlet
Linux与MacOS直接执行:3.x,4.x版本:celery worker -A demo -l info5.x版本:     celery -A demo worker -l info -P eventlet
tasks.py
from .celery import app
import time
@app.task
def add(n, m):print(n)print(m)time.sleep(10)print('n+m的结果:%s' % (n + m))return n + m@app.task
def low(n, m):print(n)print(m)print('n-m的结果:%s' % (n - m))return n - m
add_task.py
from celery_task import tasks# 添加立即执行任务
t1 = tasks.add.delay(10, 20)
t2 = tasks.low.delay(100, 50)
print(t1.id)# 添加延迟任务
from datetime import datetime, timedelta
eta=datetime.utcnow() + timedelta(seconds=10)
tasks.low.apply_async(args=(200, 50), eta=eta)
get_result.py
from celery_task.celery import appfrom celery.result import AsyncResultid = '21325a40-9d32-44b5-a701-9a31cc3c74b5'
if __name__ == '__main__':async = AsyncResult(id=id, app=app)if async.successful():result = async.get()print(result)elif async.failed():print('任务失败')elif async.status == 'PENDING':print('任务等待中被执行')elif async.status == 'RETRY':print('任务异常后正在重试')elif async.status == 'STARTED':print('任务已经开始被执行')

高级使用

celery.py
# 1)创建app + 任务# 2)启动celery(app)服务:
# 非windows
# 命令:celery worker -A celery_task -l info
# windows:
# pip3 install eventlet
# celery worker -A celery_task -l info -P eventlet# 3)添加任务:自动添加任务,所以要启动一个添加任务的服务
# 命令:celery beat -A celery_task -l info# 4)获取结果from celery import Celerybroker = 'redis://127.0.0.1:6379/1'
backend = 'redis://127.0.0.1:6379/2'
app = Celery(broker=broker, backend=backend, include=['celery_task.tasks'])# 时区
app.conf.timezone = 'Asia/Shanghai'
# 是否使用UTC
app.conf.enable_utc = False# 任务的定时配置
from datetime import timedelta
from celery.schedules import crontab
app.conf.beat_schedule = {'low-task': {'task': 'celery_task.tasks.low','schedule': timedelta(seconds=3),# 'schedule': crontab(hour=8, day_of_week=1),  # 每周一早八点'args': (300, 150),}
}
tasks.py
from .celery import appimport time
@app.task
def add(n, m):print(n)print(m)time.sleep(10)print('n+m的结果:%s' % (n + m))return n + m@app.task
def low(n, m):print(n)print(m)print('n-m的结果:%s' % (n - m))return n - m
get_result.py
from celery_task.celery import appfrom celery.result import AsyncResultid = '21325a40-9d32-44b5-a701-9a31cc3c74b5'
if __name__ == '__main__':async = AsyncResult(id=id, app=app)if async.successful():result = async.get()print(result)elif async.failed():print('任务失败')elif async.status == 'PENDING':print('任务等待中被执行')elif async.status == 'RETRY':print('任务异常后正在重试')elif async.status == 'STARTED':print('任务已经开始被执行')

django中使用

celery.py
"""
celery框架django项目工作流程
1)加载django配置环境
2)创建Celery框架对象app,配置broker和backend,得到的app就是worker
3)给worker对应的app添加可处理的任务函数,用include配置给worker的app
4)完成提供的任务的定时配置app.conf.beat_schedule
5)启动celery服务,运行worker,执行任务
6)启动beat服务,运行beat,添加任务重点:由于采用了django的反射机制,使用celery.py所在的celery_task包必须放置项目的根目录下
"""# 一、加载django配置环境
import os
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "luffyapi.settings.dev")# 二、加载celery配置环境
from celery import Celery
# broker
broker = 'redis://127.0.0.1:6379/0'
# backend
backend = 'redis://127.0.0.1:6379/1'
# worker
app = Celery(broker=broker, backend=backend, include=['celery_task.tasks'])# 时区
app.conf.timezone = 'Asia/Shanghai'
# 是否使用UTC
app.conf.enable_utc = False# 任务的定时配置
from datetime import timedelta
from celery.schedules import crontab
app.conf.beat_schedule = {'update-banner-list': {'task': 'celery_task.tasks.update_banner_list','schedule': timedelta(seconds=10),'args': (),}
}
tasks.py
from .celery import appfrom django.core.cache import cache
from home import models, serializers
from django.conf import settings
@app.task
def update_banner_list():queryset = models.Banner.objects.filter(is_delete=False, is_show=True).order_by('-orders')[:settings.BANNER_COUNT]banner_list = serializers.BannerSerializer(queryset, many=True).data# 拿不到request对象,所以头像的连接base_url要自己组装for banner in banner_list:banner['image'] = 'http://127.0.0.1:8000%s' % banner['image']cache.set('banner_list', banner_list, 86400)return True

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/2260.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

基于SpringCloud微服务流动资金贷款业务系统设计与实现

一、引言 由于传统的贷款业务系统并不能够顺应时代的变化,同时在一定程度上对业务发展进行了限制,所以为了适应时代的发展,信息贷款业务应该能够被产品化、丰富化,同时还需要制定一套特定的流程来满足新时代用户的需求。流程化的规范管理是当今银行业务发展的必然趋势,研究并开…

Linux--获取一长串目录的结构指令:tree

注意:这个tree指令不是Linux自带的,需要下载 yum install -y tree (-y的作用是免确定) 示例:

风景类Midjourney prompt提示词

稳定输出优美风景壁纸的Midjourney prompt提示词。 1\在夏夜,有淡蓝色的星空,海边,流星,烟花,海滩上全是蓝色的玫瑰和绿色的植物,由Ivan Aivazovsky和Dan Mumford,趋势在cgsociety,…

CSS差缺补漏之《高频面试题----如何使元素水平垂直居中?》

面试中经常会被问到如何使元素水平垂直居中,有哪些方法可以做到? 针对此问题,特意总结如下~ 方法一: 定位(主要是值子绝父相)与margin负值配合----依赖于子元素宽/高 (使用绝对定位或固定定位后&#xff0c…

【C/C++】深拷贝与浅拷贝

创作不易&#xff0c;本篇文章如果帮助到了你&#xff0c;还请点赞 关注支持一下♡>&#x16966;<)!! 主页专栏有更多知识&#xff0c;如有疑问欢迎大家指正讨论&#xff0c;共同进步&#xff01; &#x1f525;c系列专栏&#xff1a;C/C零基础到精通 &#x1f525; 给大…

探索图像处理的利器——OpenCV

目录 引言&#xff1a; 一、OpenCV简介&#xff1a; 二、OpenCV的特点&#xff1a; 三、OpenCV的应用领域&#xff1a; 四、实际案例&#xff1a; 结论&#xff1a; 引言&#xff1a; 在当今信息化的时代&#xff0c;图像处理已经成为了日常生活中不可或缺的一部分。从社…

sql读取数据直接存成pandas

导包 import pymysql import pandas as pd获取mysql链接 def get_db():#打开数据库连接db pymysql.connect(host*.*.*.*,port3306,user "wws",passwd "yourpasswd",db "youdb")return db db get_db()写sql 读数据保存 sql "select…

2.3、Bean的管理

一、Bean的装配&#xff08;IOC应用实现&#xff09; 创建应用组件之间的协作的行为通常称为装配&#xff08;wiring&#xff09;。Spring IOC通过应用上下文&#xff08;ApplicationContext&#xff09;装载Bean的定义并把他们组装起来。 Spring应用上下文&#xff08;Applica…

windows服务器自带IIS搭建网站并发布公网访问【内网穿透】

文章目录 1.前言2.Windows网页设置2.1 Windows IIS功能设置2.2 IIS网页访问测试 3. Cpolar内网穿透3.1 下载安装Cpolar3.2 Cpolar云端设置3.3 Cpolar本地设置 4.公网访问测试5.结语 转载自远程源码文章&#xff1a;【IIS搭建网站】本地电脑做服务器搭建web站点并公网访问「内网…

面试常问 什么是回表?为什么需要回表?

小伙伴们在面试的时候&#xff0c;有一个特别常见的问题&#xff0c;那就是数据库的回表。什么是回表&#xff1f;为什么需要回表&#xff1f; 索引结构 要搞明白这个问题&#xff0c;需要大家首先明白 MySQL 中索引存储的数据结构。这个其实很多小伙伴可能也都听说过&#xf…

iOS多语言解决方案全面指南

本文以及相关工具和代码旨在为已上线的iOS项目提供一种快速支持多语言的解决方案。由于文案显示是通过hook实现的&#xff0c;因此对App的性能有一定影响&#xff1b;除了特殊场景的文案显示需要手动支持外&#xff0c;其他任务均已实现自动化。 本文中的部分脚本代码基于 Chat…

第三方医药数据供应商有哪些?--数据业务介绍

第三方医药数据供应商主要是为医药企业、健康机构、学术研究、药物研发等提供医药相关数据的收集、整理、分析和应用服务。随着医药市场的需求衍生了许多各高垂直领域的医药数据供应商&#xff0c;这也导致了大家对医药数据供应商涉及领域认识的片面性。 故本文重点介绍各医药…