redis + celery

首先,部署Redis数据库:

先下载包:

wget http://download.redis.io/releases/redis-5.0.7.tar.gz

解压redis包:

tar -xvf redis-5.0.7.tar.gz

编译:

make 

sudo make install   (这样没有指定安装目录)

 

# 注意,redis默认安装路径:/usr/local/bin,这样其实挺好的,并不需要折腾,其实准确

# 的来说,当执行完两个make之后,就会在redis包下的src目录下生成所有必要文件,同

# 时,将一些可执行文件扔一份到 /usr/local/bin 当然,如果不想将这些可执行二进制文件

# 扔到 /usr/local/bin,可以自行指定位置,安下面命令执行即可

 

sudo make PREFIX=/usr/local/redis install  (指定redis的安装目录)

 安装完成后长这样:

c813fb02320940b59969d98bc26316d4.png

将redis配置文件复制到bin目录下(先新建文件夹然后再将redis配置文件coyp进去)

我们要将配置文件复制一份,我们以后就是用这个配置文件来启动。

cd /usr/local/bin

sudo mkdir redis_config

# 回到安装redis目录,因为redis.conf文件在这里

c160d40048b34cf4abdebf52dbae7cc2.png

sudo cp redis.conf /usr/local/bin/redis_config

 

接下来修改配置文件:

vi /usr/local/bin/redis_config/redis.conf

这里有几个地方需要注意:

第一个地方,绑定地址,允许访问的地址,默认是127.0.0.1,会导致只能在本地访问。修改为0.0.0.0则可以在任意IP访问,生产环境不要设置为0.0.0.0

0c773ee90a4e4414a8c7595978345929.png

 

第二个地方,设置守护,守护进程,修改为yes后即可后台运行

e879e96c7b3d4521888df5b8a355bb7e.png

 

第三个地方,设置密码,设置后访问Redis必须输入密码。这里注意,redis没有用户名一说,只有服务地址和密码,密码还可以不给,不像postgres等数据库,需要严格的身份验证。

57012d8d626849f59bff844220fa727b.png

其他的,基本不太动...

# 监听的端口
port 6379
# 工作目录,默认是当前目录,也就是运行redis-server时的命令,日志、持久化等文件会保存在这个目录
dir .
# 数据库数量,设置为1,代表只使用1个库,默认有16个库,编号0~15
databases 1
# 设置redis能够使用的最大内存
maxmemory 512mb
# 日志文件,默认为空,不记录日志,可以指定日志文件名
logfile "redis.log"

接下来,启动redis:

cd /usr/local/bin

redis-server redis_config/redis.conf

1c4c0444649444f29996dd394d1afcf4.png

启动客户端:

cd /usr/local/binredis-cli -h 127.0.0.1 -p 6379

设置Redis开机自启动

首先,新建一个系统服务文件:

vi /etc/systemd/system/redis.service

文件内容为:

[Unit]
Description=redis-server
After=network.target[Service]
Type=forking
ExecStart=/usr/local/bin/redis-server /usr/local/bin/redis_config/redis.conf
PrivateTmp=true[Install]
WantedBy=multi-user.target

这里其他的没啥,注意这个参数 ExecStart,填对就行。

systemctl daemon-reload 

现在,我们可以用下面这组命令来操作redis了:

# 启动
systemctl start redis
# 停止
systemctl stop redis
# 重启
systemctl restart redis
# 查看状态
systemctl status redis

执行下面的命令,可以让redis开机自启:

systemctl enable redis

 

好了,接下来,开始扯 celery 

Celery 是一款简单灵活可靠的分布式任务执行框架,支持大量任务的并发执行。

Celery 采用典型生产者和消费者模型。生产者提交任务到任务队列,众多消费者从任务队列中取任务执行。

8e6f2b706cac40efba4eb17a07be3efa.png

  • 提交任务给 Broker 队列
  • 如果是异步任务,Worker 会立即从队列中取出任务并执行,执行结果保存在 Backend 中
  • 如果是定时任务,任务由 Celery Beat 进程周期性地将任务发往 Broker 队列,Worker 实时监视消息队列获取队列中的任务执行

应用场景

  • 长时间任务的异步执行, 如上传大文件
  • 实时任务执行,支持集群部署,如支持高并发的机器学习推理
  • 定时任务执行,如定时发送邮件

节点总结:

到这里,我先记录一些理解。

首先,celery它是一个典型的生产者消费者模型。也就是说,这个模型里,可以没有生产者,但是必须得有消费者。

其次,这里先了解2个命令:

celery -A tasks worker --loglevel=info --pool=solo

celery -A proj.period_task beat -l info

这里面出现了一个 worker, 一个 beat。worker 就是消费者的意思,beat 是指周期任务。

2个命令很像,但是意思完全不一样。celery beat -A ...  是说,周期的向队列里放入任务。而celery worker -A ... 是说,一旦队列里有任务,就立刻去执行任务。所以,beat 就属于生产者,而 worker 属于消费者。如果没有 worker 那么任务只会堆积,没人处理。因此,使用celery 一定得启动 worker。

第三,selery跟我们django服务里面自定义的app一样,它本身也是一个app。

安装
本文使用 Redis 作为 Broker 即消息队列

pip install celery
pip install redis

需要持久化任务的话,Broker 使用 RabbitMQ 并设置持久化队列。
官方建议生产环境首选 RabbitMQ ,突然停止或断电 Redis 可能会数据丢失。

 

Celery 的开发主要有四个步骤:

  1. 实例化 Celery
  2. 定义任务
  3. 启动任务 Worker
  4. 调用任务

先看一个简单的celery实现:

from celery import Celery# broker 是用于存储任务的队列,backend 是用于存储任务执行结果的队列
test_celery = Celery('test', broker='redis://127.0.0.1:6379/0', backend='redis://127.0.0.1:6379/1')
# 也可以以这种方式,导入任务模块,当然,这里作为最简单的例子,是不需要的
app.conf.imports = ['tasks']@test_celery.task
def my_add(a, b):return a + b

这样,一个最简单的最基本的 celery 就已经完成了。现在已经构建了一个 celery 的异步任务。但是光有任务是没有用的,首先得有消费者,就是我之前写的小结里记录的,celery 必须得有worker,然后,在搞一个生产者,将任务放到队列里,然后,自然会有worker去执行任务,代码也就会被执行了。

启动任务 Worker

celery -A my_celery worker -l info -c 4

这里千万注意,worker 的位置:

e388c599cf724c2b8e6f20806d1e9305.png

新版本,worker不让写前面了。

连接成功后,长这样子...

316260c61e4348c98e532c3c69f2af0c.png

到这里,celery 的消费者就搞定了,然后是生产者...其实生产者无非就2种,一次性的,循环性的。但是其本质又都是一样的,就是给 task 到任务队列完事。一次性的就是只触发一次将 task 压入队列,周期性的就是间隔的将 task 压入队列。

现在构建生产者,最简单的,就是这样,直接弄一个,然后,执行这个文件即可。

from my_celery import my_addmy_add.delay(1, 2)

看下结果:

81e1b6bea5174a0ea122a6df0df6ab82.png

这是刚起的 celery 的 worker 的执行结果...

6bff08848da64ef6b8e3fd9b70963651.png

换一种写法:

from my_celery import my_addmy_add.delay(1, 2)# 使用签名模式,得到的是一个新的 task, 这种 task 可以跨越进程被调用
new_task = my_add.s(1, 2)
new_task.delay()

结果就成了:

40f31ad18b5d490ebe4a4719ac3c23b0.png

这是两种写法,先不做,讨论,一会在说。

就现在为止,基本上,我们就已经可以使用selery做事情了。尤其是在django服务中,完全可以搞一个 url 配合视图函数做任务触发,就可以利用celery做异步任务。

上面是个简单的实现,通常情况,都是写出配置文件来用的,会显得规范一些。目录结构通常是这样的。构建一个celery app的文件夹,让它和 manage.py 在同一级目录。

bbb4cedffc494485b4cc37f5e9e7ad4e.png

所有的 task 都可以放到 tasks.py 中,celery 的实例化对象可以放到 __init__.py 中,相关的配置可以放到 config.py 中。

__init__.py

from celery import Celery
from celery.schedules import crontabtest_celery = Celery('test')
# 加载配置文件
test_celery.config_from_object('my_celery.config')# 添加周期任务,在没有调度的时候,周期任务是不会执行的,只有通过周期调度命令启动的时候
# 它们才会被执行
test_celery.conf.beat_schedule = {'test001': {'task': 'my_celery.tasks.my_add',# 每周一07:30执行my_add任务'schedule': crontab(minute='30', hour='7', day_of_week='1'),'args': (1, 3)},'test002': {'task': 'my_celery.tasks.my_add',# 每分钟执行一次 my_add 任务'schedule': crontab(minute='*/1'),'args': (1, 3)},
}

config.py 

注意:这里一定得记着导入模块,因为如果这里没写导入任务模块,那么就会导致任务模块里的任务统统没被注册,那就无法使用。

BROKER_URL = 'redis://127.0.0.1:6379/0'  # Broker,中间件,进行消息传输,使用Redis
CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/1'  # Backend,结果后端,使用Redis
CELERY_RESULT_SERIALIZER = 'json'  # 结果序列化方案
CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24  # 任务过期时间
CELERY_TIMEZONE = 'Asia/Shanghai'  # 时区配置
CELERY_IMPORTS = (  # 导入的任务模块'my_celery.tasks',
)

tasks.py

from my_celery import test_celery@test_celery.task
def my_add(a, b):return a + b

test.py

from my_celery.tasks import my_addmy_add.delay(1, 2)# 定时任务,延时3秒执行
my_add.apply_async((3, 4), countdown=3)new_task = my_add.s(5, 6)
new_task.delay()

普通的调用和之前的简单模式一样,这次看下周期调用:

celery -A my_celery beat -l info

这里有第二种写法,其意义在于,周期模式是需要记录时间的,因此,可以指定一个地方让其记录时间。 

celery -A proj beat -s /home/celery/var/run/celerybeat-schedule

结果:

每隔1分钟将task压入队列

1fd5234c5e0c4fa09c1bfa7c9d4a5cf9.png

每隔1分钟,worker就能获取到task,并执行它 

b059de3b3dd84afa94d229ac44538a1b.png

到这里,基本的celery,就搞定了,可以使用了....

另外的一些花哨的用法,记录下:

调用任务

常规任务

  • delay():直接调用任务,是 apply_async() 的封装
  • apply_async():通过发送异步消息调用任务,可指定倒计时 countdown ,执行时间 eta ,过期时间 expires 等参数
  • signature():创建签名,可传递任务签名给别的进程使用,或作为其他函数的参数
  • s():创建签名的快捷方式
from my_celery.tasks import my_addresult = my_add.delay(1, 2)  # 直接调用
print(result.get())result = my_add.apply_async((1, 2), countdown=2)  # 2s后执行
print(result.get())t1 = my_add.signature((1, 2), countdown=2)  # 签名Signatures,可传递任务签名给别的进程使用,或作为其他函数的参数
result = t1.delay()
print(result.get())t1 = my_add.s(1, 2).set(countdown=2)  # 创建签名的快捷方式
result = t1.delay()
print(result.get())

组合任务

  • group():组合,接受一个可并行调用的任务列表
  • chain():串联,将签名连接在一起,一个接一个调用(前一个签名的结果作为下一个签名的第一个参数)
  • chord():和弦,类似 group() 但包含回调,在所有任务执行完后再调用任务
  • map():将参数列表应用于该任务
  • starmap():将复合参数列表应用于该任务
  • chunks():将一个很长的参数列表分块成若干部分

任务状态跟踪

这种情况,需要对 __init__.py 做出一定的修改,添加一些内容即可

from celery import Celery, Task
from celery.schedules import crontab
from celery.utils.log import get_task_loggerlogger = get_task_logger(__name__)  # 日志test_celery = Celery('test')
test_celery.config_from_object('my_celery.config')test_celery.conf.beat_schedule = {'test001': {'task': 'my_celery.tasks.my_add','schedule': crontab(minute='30', hour='7', day_of_week='1'),'args': (1, 3)},'test002': {'task': 'my_celery.tasks.my_add','schedule': crontab(minute='*/1'),'args': (1, 3)},
}class TaskMonitor(Task):def on_success(self, retval, task_id, args, kwargs):"""success时回调"""logger.info('task id:{} , arg:{} , successful !'.format(task_id, args))def on_retry(self, exc, task_id, args, kwargs, einfo):"""retry时回调"""logger.info('task id:{} , arg:{} , retry !  einfo: {}'.format(task_id, args, exc))def on_failure(self, exc, task_id, args, kwargs, einfo):"""failure时回调"""logger.info('task id: {0!r} failed: {1!r}'.format(task_id, exc))

然后,再修改下 tasks.py

from my_celery import test_celery, TaskMonitor@test_celery.task
def my_add(a, b):return a + b@test_celery.task(base=TaskMonitor)
def my_add_1(a, b):return a + b

命令行参数:

关于 celery 命令行的启动等参数,都在这了

参数含义全称
-A指定模块 
-l日志level–loglevel
-c进程数–concurrency
-Q指定队列–queue
-B周期性任务–beat
-P池的实现–pool

 

搭建redis:

Redis基础——1、Linux下安装Redis(超详细)_linux安装redis_原首的博客-CSDN博客

使用redis:

https://www.cnblogs.com/fuminer/p/17254164.html

celery的使用:

Python定时任务库Celery——分布式任务队列_python celery_XerCis的博客-CSDN博客

 

其他参考,总之,看到的帖子,都有错误之处,往往不能让我通达,故写此贴:

https://www.cnblogs.com/clark1990/p/17174251.html

Periodic Tasks — Celery 5.3.5 documentation

https://docs.celeryq.dev/en/stable/reference/cli.html

Python定时任务库Celery——分布式任务队列_python 使用分布式消息系统celery实现定时任务 自动执行python 脚本_XerCis的博客-CSDN博客

Python-Celery定时任务、延时任务、周期任务、crontab表达式及清除任务的基本使用与踩坑 - 知乎

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/188949.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

nodejs+vue面向中小学课堂教学辅助软件系统的设计与实现-微信小程序-安卓-python-PHP-计算机毕业设计

主要功能有,管理员通过后台会对此教学辅助进行审核,管理员在还可以进行首页、个人中心、学生管理、教师管理、班级信息管理、科目名称管理、课程信息管理、教学资料管理、作业信息管理、作业提交管理、作业成绩管理、在线考试管理、试题管理、考试管理、…

为什么要使用动态代理IP?数据采集使用动态代理有哪些优势?

随着互联网的普及,数据采集已经成为企业、营销人员和数据分析师的重要工作之一。然而,在采集数据的过程中,经常会遇到一些问题,如IP被封禁、访问受限等。为了解决这些问题,动态代理IP应运而生。那么,为什么…

Stable Diffusion进阶玩法说明

之前章节介绍了Stable Diffusion的入门,介绍了文生图的魅力,可以生成很多漂亮的照片,非常棒 传送门: Stable Diffusion新手村-我们一起完成AI绘画-CSDN博客 那我们今天就进一步讲讲这个Stable Diffusion还能做些什么, …

腾讯云4核8G服务器配置价格表,轻量和CVM标准型S5实例

腾讯云4核8G服务器S5和轻量应用服务器优惠价格表,轻量应用服务器和CVM云服务器均有活动,云服务器CVM标准型S5实例4核8G配置价格15个月1437.3元,5年6490.44元,轻量应用服务器4核8G12M带宽一年446元、529元15个月,腾讯云…

Redis(列表List)

使用LPUSH从头部添加元素,可以一次添加一个或多个。 使用LRANGE 查看列表中的数据,0表示起始位置,-1表示结束位置。 当然也可以使用RPUSH来从尾部添加元素。 可以使用RPOP从尾部删除元素,会返回删除的元素的值。 同理使用LPOP…

瑞吉外卖Day06

1.用户地址 1.1实体类 /*** 地址簿*/ Data public class AddressBook implements Serializable {private static final long serialVersionUID 1L;private Long id;//用户idprivate Long userId;//收货人private String consignee;//手机号private String phone;//性别 0 女…

【完美世界】石昊身上宝术至尊骨、上苍之手和轮回宝术哪个最强

Hello,小伙伴们,我是小郑继续为大家深度解析国漫资讯。 完美世界动画中,石昊通过举起天人族的镇教之宝飞仙石,终于补全了第一块至尊骨的天赋宝术-上苍之手。然而,这只是开始,上苍之手的终极奥义还需要他慢慢领悟。 在…

智能指针面试题

智能指针被问到的概率还是很大的,特别是Shared_ptr,最好会手撕,亲身经历! 基本概念 1. RAll RAII(Resource Acquisition Is Initialization)是一种利用对象生命周期来控制程序资源(如内存、文…

简单介绍二分类问题评价指标

正确率(Accuracy) Accuracy ​(TP TN)/(TP TN FP FN)精准率(Precision) 记忆:在识别出某标签中正确的比例; 比如识别为某标签的一共有105个,其中有95个是识别对的,那Precision就是95/105; TP/(TPFP)召回率(Recall…

python趣味编程-5分钟实现一个Tic Tac Toe游戏(含源码、步骤讲解)

The Tic Tac Toe In Python是用 Python 编程语言编写的,这个Tic Tac Toe Game In Python是一个简单的基于 GUI 的策略游戏板,非常容易理解和使用。 所有的游戏规则都是一样的,就像我们玩实时井字棋一样,这是一个简单的多人游戏。 Python 中的 Tic Tac Toe 游戏:项目信息 …

Python----图像的手绘效果

图像的数组表示 图像是有规则的二维数据,可以用numpy 库将图像转换成数组对象 : from PIL import Image import numpy as np imnp.array(Image.open("D://np.jpg")) print(im.shape,im.dtype)结果: 图像转换对应的ndarray 类型是3 维数据&am…

什么是CDN?什么是安全加速CDN?有什么优势?

安全加速CDN(Content Delivery Network)是一种网络架构,它通过在全球范围内部署服务器并缓存静态和动态内容来提供更快的Web页面加载和更好的用户体验。安全加速CDN可以保护网站免受DDoS攻击、恶意软件和其他安全威胁,从而提高网站的可用性和稳定性。它通…