Celery + redis 异步分布式任务队列安装测试

Celery 异步分布式任务队列

Celery 5.4.0 官方文档

环境:3台 centos7.9 普通用户

redisSchedulerworker
dp951
dp96111
dp971

文章目录

  • Celery 异步分布式任务队列
    • 1、Celery 介绍
    • 2、安装部署
      • 2.1 安装消息中间件(broker)
      • 2.2 安装Celery
    • 3、功能测试
      • 3.1 创建任务
          • 3.1.1 在dp96 机器上创建应用文件 tasks.py
          • 3.1.2 分发应用到dp95、dp97 机器相同位置
      • 3.2 启动worker服务
          • 3.2.1 在dp95、dp96、dp97三台机器分别启动celery worker服务
          • 3.2.2 查看redis服务,borker 已存在队列,backend 无任何结果
      • 3.3 执行分布式异步任务
      • 3.4 查看异步任务结果

1、Celery 介绍

Celery是一个简单、灵活、可靠的分布式系统,基于python开发,可以处理大量的消息,同时提供维护这样一个系统所需的工具。

它是一个专注于实时处理的任务队列,同时也支持任务调度

使用场景

  • 定时任务:定时爬虫、算法模型定时输出
  • 异步任务:I/O密集型任务,消息推送、邮件发送、ai客服
  • 分布式调度:airflow + celery 大数据ETL调度

优点:使用后再说

架构:AMQP(Advanced Message Queuing Protocol)高级消息队列协议

2、安装部署

2.1 安装消息中间件(broker)

Celery 需要一个中间件来进行接收和发送消息,通常以独立的服务形式出现,成为 消息中间人(Broker)。官方推荐RabbitMQ和Redis。

RabbitMQ:

  • 优点:支持AMQP(高级消息队列协议),可靠性高,支持消息持久化,有丰富的功能特性(如消息确认、重试、超时、死信队列等)。
  • 缺点:学习曲线较陡峭,配置复杂,性能可能较低。

Redis:

  • 优点:配置简单,性能高,可以用作消息队列用于简单的场景。
  • 缺点:不支持AMQP,不适合重载的消息队列处理,不支持消息的持久化和异步确认。

本次测试安装 redis http://download.redis.io/releases/

# redis 安装
# 解压
[dp96]$ tar -zxvf redis-7.2.4.tar.gz
[dp96]$ cd redis-7.2.4
# 编译
[dp96]$ make
# 安装
[dp96]$ make PREFIX= ~/redis install# 修改配置文件
[dp96]$ vim ./redis.conf
'''
bind * -::*           # 绑定主机地址
protected-mode no     # 保护模式设置为 no,允许外网连接
port 6379             # 监听端口
timeout 0             # 当客户端闲置多长时间后关闭连接,如果指定为 0,表示关闭该功能
daemonize yes         # yes表示启用守护进程,默认是no即不以守护进程方式运行
loglevel notice       # 日志级别
logfile ./redis-server.log  # 指定 Redis 服务器的日志文件路径
dir ./                      # Redis 服务器的工作目录,即数据库文件的存放路径
pidfile /tmp/redis_6379.pid # 进程文件
'''# 启动redis
[dp96]$ cd ~/redis/bin
[dp96]$ ./redis-server ~/opt/redis/redis-7.2.4/redis.conf# 连接验证
[dp96]$ ./redis-cli
127.0.0.1:6379> CONFIG GET *

在这里插入图片描述

# redis 常用操作
# Redis支持五种数据类型:string(字符串),hash(哈希),list(列表),set(集合)及zset(sorted set:有序集合)# 切换数据库0-15
127.0.0.1:6379> select 1 
127.0.0.1:6379> scan 0 /keys *       # 查看0号数据库所有的key# 字符串类型
127.0.0.1:6379> set a 'test' 
127.0.0.1:6379> get a           # "test"
127.0.0.1:6379> del a# 哈希类型,适合存储对象
127.0.0.1:6379> hmset a field1 'Hello' field2 'world' 
127.0.0.1:6379> hget a            # (error) ERR wrong number of arguments for 'hget' command        
127.0.0.1:6379> hget a field2     # "world"
127.0.0.1:6379> del a             # 重复key 会报错# 列表
127.0.0.1:6379> lpush a redis
127.0.0.1:6379> lpush a rabbitmq
127.0.0.1:6379> lpush a 1
127.0.0.1:6379> lrange a 0 10
'''
1) "1"
2) "rabbitmq"
3) "redis"
'''# 集合
127.0.0.1:6379> sadd a 1
127.0.0.1:6379> sadd a 1
127.0.0.1:6379> sadd a 2
127.0.0.1:6379> smembers a
'''
1) "1"
2) "2"
'''# 有序集合 zadd key score member 
127.0.0.1:6379> zadd a 1 张三
127.0.0.1:6379> zadd a 1 李四
127.0.0.1:6379> zadd a 2 王五
127.0.0.1:6379> zrangebyscore a 0 1   # 选去0-1分的集合元素'''
1) "\xe5\xbc\xa0\xe4\xb8\x89"
2) "\xe5\xe6\x9d\x8e\xe5\x9b\x9b"
'''

2.2 安装Celery

# 安装celery
(airflow)[dp96]$ pip install celery -i https://pypi.tuna.tsinghua.edu.cn/simple(airflow)[dp95]$ pip install celery
(airflow)[dp97]$ pip install celery# 安装 redis Celery 远程连接redis服务时使用
(airflow)[dp96]$ pip install redis
(airflow)[dp95]$ pip install redis
(airflow)[dp97]$ pip install redis

3、功能测试

3.1 创建任务

3.1.1 在dp96 机器上创建应用文件 tasks.py
# 创建 tasks.py 
from celery import Celery
import timeapp = Celery('tasks', broker='redis://10.18.18.96:6379/0',backend='redis://10.18.18.96:6379/1') # broker 任务队列;backend 结果存储数据库@app.task
def time_sleep(n):time.sleep(n) return f'延时{n}s函数'
3.1.2 分发应用到dp95、dp97 机器相同位置
(airflow)[dp96 ~/celery]$ scp -r tasks.py dp95:~/celery
(airflow)[dp96 ~/celery]$ scp -r tasks.py dp97:~/celery

3.2 启动worker服务

3.2.1 在dp95、dp96、dp97三台机器分别启动celery worker服务
# 以下是Celery命令入口点的选项解释:
-A, --app APPLICATION: 指定Celery应用的模块名或路径。
-b, --broker TEXT: 指定消息代理的地址,例如RabbitMQ或Redis。
--result-backend TEXT: 指定任务结果的后端存储地址。
--loader TEXT: 指定Celery加载器的类型。
--config TEXT: 指定配置文件的路径。
--workdir PATH: 指定Celery工作目录。
-C, --no-color: 禁用彩色输出。
-q, --quiet: 静默模式,减少输出。
--version: 显示Celery版本号。
--skip-checks: 跳过配置检查。
# tasks 是我们任务所在的文件名,workdir tasks.py任务文件所在目录 worker 表示启动的是 worker 程序
(airflow)[dp96 ~/celery]$ ..envs/airflow/bin/celery -A tasks --workdir ~/celery/  worker  --loglevel=info(airflow)[dp95 ~/celery]$ ..envs/airflow/bin/celery -A tasks --workdir ~/celery/  worker  --loglevel=info
(airflow)[dp97 ~/celery]$ ..envs/airflow/bin/celery -A tasks --workdir ~/celery/  worker  --loglevel=info

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

3.2.2 查看redis服务,borker 已存在队列,backend 无任何结果

在这里插入图片描述
在这里插入图片描述

3.3 执行分布式异步任务

3.3.1 在dp96另一个shell界面,在task.py文件同目录下,创建test.py文件发布任务

from tasks import time_sleep
import timedef test():start = time.time()res = []for i in range(10):res_ = time_sleep.delay(5)  # 发布任务到broker (redis),正常同步任务执行时间10*5=50sstop = time.time()print(f'运行时间:{stop-start}')

在这里插入图片描述

3.4 查看异步任务结果

dp95:
在这里插入图片描述

dp96:

在这里插入图片描述

dp97:

在这里插入图片描述

如图所示,3台机器分别从队列中取出了3、4、3个睡眠5秒的任务,并在各自机器上并行(异步)运行,3台机器累计运行任务共花费5s

redis 后台结果数据库:

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/669723.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

《Video Mamba Suite》论文笔记(2)Mamba对于多模态交互的作用

原文翻译 4.2 Mamba for Cross-Modal Interaction Tasks and datasets.除了单模态任务外,我们还评估了 Mamba 在跨模态交互方面的性能。我们首先使用视频时间定位 (Video Temporal Grounding) 任务进行评估。所涉及的数据集包含 QvHighlight [44] 和 Charade-STA …

【Scala---01】Scala简介与环境部署『 Scala简介 | 函数式编程简介 | Scala VS Java | 安装与部署』

文章目录 1. Scala简介2. 函数式编程简介3. Scala VS Java4. 安装与部署 1. Scala简介 Scala是由于Spark的流行而兴起的。Scala是高级语言,Scala底层使用的是Java,可以看做是对Java的进一步封装,更加简洁,代码量约是Java的一半。…

如何在ubuntu上安装idear

在官网下载压缩包 下载 IntelliJ IDEA – 领先的 Java 和 Kotlin IDE (jetbrains.com.cn) idea——java开发者工具,有社区办(免费)专业版(收费)这两个版本,可自己选择,还有就是不同的系统对应…

什么是期货?期货的基础知识有哪些?

期货是一种标准化的远期合约,允许买卖双方在未来特定时间以预定价格交易货物或金融资产。也是一种金融衍生品,它为市场参与者提供了一种管理价格波动风险和进行投资的工具。 期货的基础知识有哪些 期货市场是一个复杂的金融环境,对于初学者来…

Linux 操作系统IPC

目录 1、IPC简介 1.1、共享内存 1.1.1 创建/访问共享内存 1.1.2 映射 1.1.3 解除映射 1.1.4 删除/修改共享内存 1.2 信号量集 1.2.1 创建信号量集合 1.2.2 信号量的初始化 1.2.3 信号量的还原和消耗 1.3 消息队列 1.3.1 概念 1.3.3 添加消息队列 1.3.4 读取消息…

五一反向旅游,景区“AI+视频监控”将持续助力旅游业发展

一、建设背景 每年五一劳动节出去旅游都是人挤人状态,这导致景区的体验感极差。今年“五一反向旅游”的话题冲上了热搜,好多人选择了五一之后再出去旅游,避开拥挤的人群,这个时候景区的监管力度和感知能力就更要跟上去&#xff0…

SQL 基础 | AVG 函数的用法

在SQL中,AVG()是一个聚合函数,用来计算某个列中所有值的平均值。 它通常与GROUP BY子句一起使用,以便对分组后的数据进行平均值计算。 AVG()函数在需要了解数据集中某个数值列的中心趋势时非常有用。 以下是AVG()函数的一些常见用法&#xff…

萤瓴优选:一键带货,轻松赚取佣金!短视频素材软件助你快速上手!

在如今的电商时代,137短视频带货9766成为了一种3209越来越流行的购物方式。想要通过短视频带货赚取佣金,但又苦恼于找不到合适的素材和发布平台?别担心,现在有一款专业的软件可以解决这个问题!(上面数字可联…

学习记录:AUTOSAR R20-11的阅读记录(五)【CP(5.11-5.19)】完

接上回:学习记录:AUTOSAR R20-11的阅读记录(四)【CP(5.6-5.10)】 五、CP 11、General(4个) 5.11 File Name 说明 1 AUTOSAR_EXP_ LayeredSoftwareArchitecture.pdf 描述了AUTO…

艾体宝方案 | 加密USB金融解决方案

在现代金融行业中,保护敏感数据和合规性已成为至关重要的任务。为了帮助金融公司应对移动性风险和合规挑战,我们提供了一种高效的加密USB解决方案。 一、为什么金融公司需要加密USB解决方案 1、降低移动性风险 金融服务公司正在迅速过渡到一种模式&a…

银价下跌怎么办?现货白银买卖分析方法要掌握

现货白银买卖分析是进行现货白银投资的基础,尤其是近几个交易日现货白银价格出现了下跌后,更加凸显了买卖分析能力在市场中的重要性。不光要会买,还得懂得如何卖。下面我们来介绍2个现货白银买卖分析的方法。 基于RSI指标的现货白银买卖分析。…

目标检测——铁路轨道故障数据集

一、重要性及意义 安全性保障:铁路作为重要的交通工具,其安全性能直接关系到乘客和货物的安全。铁路轨道故障,如裂缝、变形、错位、缺失紧固件等,都可能引发列车脱轨、倾覆等严重事故。因此,及时发现和修复这些故障&a…