Python开发运维:Celery连接Redis

目录

一、理论

1.Celery

二、实验

1.Windows11安装Redis

2.Python3.8环境中配置Celery

三、问题

1.Celery命令报错

2.执行Celery命令报错

3.Win11启动Celery报ValueErro错误


 

 

 

一、理论

1.Celery

(1) 概念

 Celery是一个基于python开发的分布式系统,它是简单、灵活且可靠的,处理大量消息,专注于实时处理的异步任务队列,同时也支持任务调度。

e603186045544c238c4fb2a222ea12e8.jpeg

 

(2) 架构

Celery的架构由三部分组成,消息中间件(message broker),任务执行单元(worker)和任务执行结果存储(task result store)组成。

1)消息中间件
Celery本身不提供消息服务,但是可以方便的和第三方提供的消息中间件集成。包括,RabbitMQ, Redis等等2)任务执行单元
Worker是Celery提供的任务执行的单元,worker并发的运行在分布式的系统节点中。3)任务结果存储
Task result store用来存储Worker执行的任务的结果,Celery支持以不同方式存储任务的结果,包括AMQP, redis等

5312be781c034a5db52c3796f345803f.png

 

 (3)  特点

1)简单
Celery易于使用和维护,并且它不需要配置文件并且配置和使用是比较简单的2)高可用
当任务执行失败或执行过程中发生连接中断,celery会自动尝试重新执行任务3)快速
单个 Celery 进程每分钟可处理数以百万计的任务,而保持往返延迟在亚毫秒级4)灵活
Celery几乎所有部分都可以扩展或单独使用,各个部分可以自定义。

 

(4)场景

Celery是一个强大的 分布式任务队列的异步处理框架,它可以让任务的执行完全脱离主程序,甚至可以被分配到其他主机上运行。通常使用它来实现异步任务(async task)和定时任务(crontab)。

1)异步任务
将耗时操作任务提交给Celery去异步执行,比如发送短信/邮件、消息推送、音视频处理等等2)定时任务
定时执行某件事情,比如每天数据统计

 

二、实验

1.Windows11安装Redis

(1)下载最新版Redis

Redis-x64-xxx.zip压缩包到D盘,解压后,将文件夹重新命名为 Redis

(2)查看目录

D:\Redis>dir

5ae0dc109bd14283871d6b2729e8b311.png

(3)打开一个 cmd 窗口 使用 cd 命令切换目录到 D:\Redis 运行

redis-server.exe redis.windows.conf

2a3ce0e66a724e6b82652bcd9542b81c.png

 

(4)把 redis 的路径加到系统的环境变量

fea7cb630e4b49b1af6995f17dc6f6a0.png

 

(5)另外开启一个 cmd 窗口,原来的不要关闭,因为先前打开的是redis服务端

 

#切换到 redis 目录下运行
redis-cli.exe -h 127.0.0.1 -p 6379

2baf1597234a4fac8e7675d55476cfdf.png

(6)检测连接是否成功

#设置键值对
set firstKey 123#取出键值对
get firstKey#退出
exit

 

5d02fd34c1e348d19d4230ba1aca676b.png

 

(7)ctrl+c 退出先前打开的服务端

836c461954584108a4b165ef7ea57eae.png

(8)注册Redis服务

#通过 cmd 命令行工具进入 Redis 安装目录,将 Redis 服务注册到 Windows 服务中,执行以下命令
redis-server.exe --service-install redis.windows.conf --loglevel verbose

8bbbba5f87fa4735bed7bafb9cb497ca.png

(9)启动Redis服务

#执行以下命令启动 Redis 服务
redis-server --service-start

650f6e301d1c4d9aa075d9fd1ad05dc6.png

(10)Redis 已经被添加到 Windows 服务中

b77309f2aa214246b83e81213970669e.png

(11)打开Redis服务,将启动类型设置为自动,即可实现开机自启动

8cec0e57cda84312a1185fe0c8960edd.png

 

2.Python3.8环境中配置Celery

(1) PyCharm安装celery+redis

#celery是典型的生产者+消费者的模式,生产者生产任务并加入队列中,消费者取出任务消费。多用于处理异步任务或者定时任务。#第一种方式
pip install celery
pip install redis#第二种方式
pip install -i https://pypi.douban.com/simple celery
pip install -i https://pypi.douban.com/simple redis

6fcc1ca414684d29a18adaff6ba30620.png

 

(2)新建异步任务执行文件celery_task.py.相当于注册了celery app

# -*- coding: utf-8 -*-
from celery import Celery
import time
app = Celery('demo', backend='redis://localhost:6379/1', broker='redis://localhost:6379/2')
@app.task
def send_email(name):print("向%s发送邮件..."%name)time.sleep(5)print("向%s发送邮件完成"%name)return "ok"

e2c71ca92a1841818b47e6cb8349db96.png

(3) 在项目文件目录下创建worker消费任务

PS D:\soft\pythonProject> celery --app=celerypro.celery_task worker -n node1 -l INFO-------------- celery@node1 v5.3.5 (emerald-rush)
--- ***** -----
-- ******* ---- Windows-10-10.0.22621-SP0 2023-11-22 17:26:39
- *** --- * ---
- ** ---------- [config]
- ** ---------- .> app:         test:0x1e6fa358550
- ** ---------- .> transport:   redis://127.0.0.1:6379/2
- ** ---------- .> results:     redis://127.0.0.1:6379/1
- *** --- * --- .> concurrency: 32 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** ------------------- [queues].> celery           exchange=celery(direct) key=celery[tasks]. celerypro.celery_task.send_email[2023-11-22 17:26:39,265: WARNING/MainProcess] d:\soft\python38\lib\site-packages\celery\worker\consumer\consumer.py:507: CPendingDeprecationWarning: The broker_connection_retry configuration setting will no longer determine
[2023-11-22 20:30:08,249: INFO/MainProcess] mingle: searching for neighbors
[2023-11-22 20:30:15,379: INFO/MainProcess] mingle: all alone
[2023-11-22 20:30:25,608: INFO/MainProcess] celery@node1 ready.

3c72f0c84b774bc6b4698d3546168029.png

98d51e6e8dd740e495f85fa8298734ca.png

 

(4)ctrl+c 退出

140565377d5a4ff1b3a34f9026d5ae0f.png

(5)修改celery_task.py文件,增加一个task

# -*- coding: utf-8 -*-
from celery import Celery
import time
app = Celery('demo', backend='redis://localhost:6379/1', broker='redis://localhost:6379/2')
@app.task
def send_email(name):print("向%s发送邮件..."%name)time.sleep(5)print("向%s发送邮件完成"%name)return "ok"
@app.task
def send_msg(name):print("向%s发送短信..."%name)time.sleep(5)print("向%s发送邮件完成"%name)return "ok"

66807234d7e64b8891d82f81bea6cc48.png

(6)再次在项目文件目录下创建worker消费任务

PS D:\soft\pythonProject> celery --app=celerypro.celery_task worker -n node1 -l INFO-------------- celery@node1 v5.3.5 (emerald-rush)
--- ***** ----- 
-- ******* ---- Windows-10-10.0.22621-SP0 2023-11-22 21:01:43
- *** --- * --- 
- ** ---------- [config]
- ** ---------- .> app:         demo:0x29cea446250
- ** ---------- .> transport:   redis://localhost:6379/2
- ** ---------- .> results:     redis://localhost:6379/1
- *** --- * --- .> concurrency: 32 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** ----- -------------- [queues].> celery           exchange=celery(direct) key=celery[tasks]. celerypro.celery_task.send_email. celerypro.celery_task.send_msg[2023-11-22 21:01:43,381: WARNING/MainProcess] d:\soft\python38\lib\site-packages\celery\worker\consumer\consumer.py:507: CPendingDeprecationWarning: The broker_connection_retry configuration setting will no longer determine
[2023-11-22 21:01:43,612: INFO/SpawnPoolWorker-23] child process 23988 calling self.run()
[2023-11-22 21:01:43,612: INFO/SpawnPoolWorker-17] child process 16184 calling self.run()
[2023-11-22 21:01:43,612: INFO/SpawnPoolWorker-21] child process 22444 calling self.run()
[2023-11-22 21:01:43,612: INFO/SpawnPoolWorker-27] child process 29480 calling self.run()
[2023-11-22 21:01:43,612: INFO/SpawnPoolWorker-24] child process 5844 calling self.run()
[2023-11-22 21:01:43,631: INFO/SpawnPoolWorker-25] child process 8896 calling self.run()
[2023-11-22 21:01:43,634: INFO/SpawnPoolWorker-29] child process 28068 calling self.run()
[2023-11-22 21:01:43,634: INFO/SpawnPoolWorker-28] child process 18952 calling self.run()
[2023-11-22 21:01:43,636: INFO/SpawnPoolWorker-26] child process 13680 calling self.run()
[2023-11-22 21:01:43,638: INFO/SpawnPoolWorker-31] child process 25472 calling self.run()
[2023-11-22 21:01:43,638: INFO/SpawnPoolWorker-30] child process 28688 calling self.run()
[2023-11-22 21:01:43,638: INFO/SpawnPoolWorker-32] child process 10072 calling self.run()
[2023-11-22 21:01:45,401: INFO/MainProcess] Connected to redis://localhost:6379/2
[2023-11-22 21:01:45,401: WARNING/MainProcess] d:\soft\python38\lib\site-packages\celery\worker\consumer\consumer.py:507: CPendingDeprecationWarning: The broker_connection_retry configuration setting will no longer determine
whether broker connection retries are made during startup in Celery 6.0 and above.
If you wish to retain the existing behavior for retrying connections on startup,
you should set broker_connection_retry_on_startup to True.warnings.warn([2023-11-22 21:01:49,477: INFO/MainProcess] mingle: searching for neighbors
[2023-11-22 21:01:56,607: INFO/MainProcess] mingle: all alone
[2023-11-22 21:02:04,753: INFO/MainProcess] celery@node1 ready.

0c78eaa6c74c494888757c8050c10bd7.png

(6)ctrl+c 退出创建执行任务文件produce_task.py

# -*- coding: utf-8 -*-
from celerypro.celery_task  import send_email,send_msg
result = send_email.delay("david")
print(result.id)
result2 = send_msg.delay("mao")
print(result2.id)

e09e10141a60455ab943cad7e1fae99f.png

 

(7)运行produce_task.py

5521912205114196b1b4c583d983cb37.png

(8)同时取到id值

07f8d1c9596b426c8e87cf9550885939.png

(9)如遇到报错需要安装包 eventlet

PS D:\soft\pythonProject> pip install eventlet

8c95c48bc5504bf0b2d648256968509b.png
(10)重新在项目文件目录下创建worker消费任务

PS D:\soft\pythonProject> celery --app=celerypro.celery_task worker -n node1 -l INFO -P eventlet-------------- celery@node1 v5.3.5 (emerald-rush)
--- ***** -----
-- ******* ---- Windows-10-10.0.22621-SP0 2023-11-22 21:29:34
- *** --- * ---
- ** ---------- [config]
- ** ---------- .> app:         demo:0x141511962e0
- ** ---------- .> transport:   redis://localhost:6379/2
- ** ---------- .> results:     redis://localhost:6379/1
- *** --- * --- .> concurrency: 32 (eventlet)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** ------------------- [queues].> celery           exchange=celery(direct) key=celery[tasks]. celerypro.celery_task.send_email. celerypro.celery_task.send_msgr_connection_retry configuration setting will no longer determine
whether broker connection retries are made during startup in Celery 6.0 and above.
If you wish to retain the existing behavior for retrying connections on startup,
you should set broker_connection_retry_on_startup to True.warnings.warn([2023-11-22 21:29:48,022: INFO/MainProcess] pidbox: Connected to redis://localhost:6379/2.
[2023-11-22 21:29:52,117: INFO/MainProcess] celery@node1 ready.


b9b48832640a472f8eb3366cdc9fe2dc.png

(11) 运行produce_task.py

5521912205114196b1b4c583d983cb37.png

(12)生成id

007b9689be2b46439d14211b0085d0e3.png

(13)查看任务消息

[2023-11-22 21:30:35,194: INFO/MainProcess] Task celerypro.celery_task.send_email[c1a473d5-49ac-4468-9370-19226f377e00] received
[2023-11-22 21:30:35,195: WARNING/MainProcess] 向david发送邮件...
[2023-11-22 21:30:35,197: INFO/MainProcess] Task celerypro.celery_task.send_msg[de30d70b-9110-4dfb-bcfd-45a61403357f] received
[2023-11-22 21:30:35,198: WARNING/MainProcess] 向mao发送短信...
[2023-11-22 21:30:40,210: WARNING/MainProcess] 向david发送邮件完成
[2023-11-22 21:30:40,210: WARNING/MainProcess] 向mao发送邮件完成
[2023-11-22 21:30:42,270: INFO/MainProcess] Task celerypro.celery_task.send_msg[de30d70b-9110-4dfb-bcfd-45a61403357f] succeeded in 7.063000000001921s: 'ok'
[2023-11-22 21:30:42,270: INFO/MainProcess] Task celerypro.celery_task.send_email[c1a473d5-49ac-4468-9370-19226f377e00] succeeded in 7.063000000001921s: 'ok'

dd07e64e52474ee9a14aeee201e655fd.png

(14)创建py文件:result.py,查看任务执行结果

取第2个id:de30d70b-9110-4dfb-bcfd-45a61403357f

# -*- coding: utf-8 -*-
from celery.result import AsyncResult
from celerypro.celery_task import app
async_result = AsyncResult(id="de30d70b-9110-4dfb-bcfd-45a61403357f", app=app)
if async_result.successful():result = async_result.get()print(result)
elif async_result.failed():print('执行失败')
elif async_result.status == 'PENDING':print('任务等待中被执行')
elif async_result.status == 'RETRY':print('任务异常后正在重试')
elif async_result.status == 'STARTED':print('任务已经开始被执行')

3cb2468f7ef94b5596325b7a77c40382.png

(15) 运行result.py文件

f4278e4abcf34edc95c652941ef3646f.png

(16)输出ok

de18aec5d2bf43b4acb8fb91a82b1711.png

 

三、问题

1.Celery命令报错

(1)报错

66136da541e742eeb913a0b4ddba106f.png

428d102f7add401fbf1c8c695e690f0e.png

(2)原因分析

celery版本不同命令不同。

查看帮助命令

PS D:\soft\pythonProject> celery --help
Usage: celery [OPTIONS] COMMAND [ARGS]...Celery command entrypoint.Options:-A, --app APPLICATION-b, --broker TEXT--result-backend TEXT--loader TEXT--config TEXT--workdir PATH-C, --no-color-q, --quiet--version--skip-checks          Skip Django core checks on startup. Setting theSKIP_CHECKS environment variable to any non-emptystring will have the same effect.--help                 Show this message and exit.Commands:amqp     AMQP Administration Shell.beat     Start the beat periodic task scheduler.call     Call a task by name.control  Workers remote control.events   Event-stream utilities.graph    The ``celery graph`` command.inspect  Inspect the worker at runtime.list     Get info from broker.logtool  The ``celery logtool`` command.migrate  Migrate tasks from one broker to another.multi    Start multiple worker instances.purge    Erase all messages from all known task queues.report   Shows information useful to include in bug-reports.result   Print the return value for a given task id.shell    Start shell session with convenient access to celery symbols.status   Show list of workers that are online.upgrade  Perform upgrade between versions.worker   Start worker instance.
PS D:\soft\pythonProject> celery  worker --help
Usage: celery worker [OPTIONS]Start worker instance.Examples--------$ celery --app=proj worker -l INFO$ celery -A proj worker -l INFO -Q hipri,lopri$ celery -A proj worker --concurrency=4$ celery -A proj worker --concurrency=1000 -P eventlet$ celery worker --autoscale=10,0Worker Options:-n, --hostname HOSTNAME         Set custom hostname (e.g., 'w1@%%h').Expands: %%h (hostname), %%n (name) and %%d,(domain).-D, --detach                    Start worker as a background process.-S, --statedb PATH              Path to the state database. The extension'.db' may be appended to the filename.-l, --loglevel [DEBUG|INFO|WARNING|ERROR|CRITICAL|FATAL]Logging level.-O, --optimization [default|fair]Apply optimization profile.--prefetch-multiplier <prefetch multiplier>Set custom prefetch multiplier value forthis worker instance.Pool Options:-c, --concurrency <concurrency>Number of child processes processing thequeue.  The default is the number of CPUsavailable on your system.-P, --pool [prefork|eventlet|gevent|solo|processes|threads|custom]Pool implementation.-E, --task-events, --events     Send task-related events that can becaptured by monitors like celery events,celerymon, and others.--time-limit FLOAT              Enables a hard time limit (in secondsint/float) for tasks.--soft-time-limit FLOAT         Enables a soft time limit (in secondsint/float) for tasks.--max-tasks-per-child INTEGER   Maximum number of tasks a pool worker canexecute before it's terminated and replacedby a new worker.--max-memory-per-child INTEGER  Maximum amount of resident memory, in KiB,that may be consumed by a child processbefore it will be replaced by a new one.  Ifa single task causes a child process toexceed this limit, the task will becompleted and the child process will bereplaced afterwards. Default: no limit.--scheduler TEXTDaemonization Options:-f, --logfile TEXT  Log destination; defaults to stderr--pidfile TEXT--uid TEXT--gid TEXT--umask TEXT--executable TEXTOptions:--help  Show this message and exit.

(3)解决方法

修改命令

PS D:\soft\pythonProject> celery --app=celerypro.celery_task worker -n node1 -l INFO

成功

577654ea7a024e639f347a4282e8da16.png

 

2.执行Celery命令报错

(1)报错

AttributeError: 'NoneType' object has no attribute 'Redis'

5f0ebb3ce11b46969600d1af9d0a2b31.png

 

(2)原因分析

PyCharm未安装redis插件。

(3)解决方法

安装redis插件

d2610162bdb04d24aa06a5941fa820de.png

 

3.Win11启动Celery报ValueErro错误

(1)报错

Windows 在开发 Celery 异步任务,通过命令 celery --app=celerypro.celery_task worker -n node1 -l INFO 启动 Celery 服务后正常;

但在使用 delay() 调用任务时会出现以下报错信息:

Task handler raised error: ValueError('not enough values to unpack (expected 3, got 0)')
 

388cc7dd79524e98b34cfacbe7cb594a.png

(2)原因分析

PyCharm未安装eventlet

(3)解决方法

安装包 eventlet

pip install eventlet

6c57942c19d44ded89153a51b9e3e41b.png

通过以下命令启动服务

celery --app=celerypro.celery_task worker -n node1 -l INFO -P eventlet

0565710b258946c48643a046ea458b0f.png

 

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/207754.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【三维几何学习】自制简单的网格可视化软件 — Mesh Visualization

自制简单的网格可视化软件 — Mesh Visualization 引言一、整体框架1.1 三角形网格1.2 界面管理1.3 VTK可视化界面 二、核心源码2.1 三角形网格&#xff1a;TriMesh类2.2 界面Widget_Mesh_Manager2.3 VTK可视化2.4 main 引言 使用PyQt自制简单的网格可视化软件 - 视频展示 本是…

深信服技术认证“SCSA-S”划重点:信息收集

为帮助大家更加系统化地学习网络安全知识&#xff0c;以及更高效地通过深信服安全服务认证工程师考核&#xff0c;深信服特别推出“SCSA-S认证备考秘笈”共十期内容&#xff0c;“考试重点”内容框架&#xff0c;帮助大家快速get重点知识~ 划重点来啦 深信服安全服务认证工程师…

基于材料生成算法优化概率神经网络PNN的分类预测 - 附代码

基于材料生成算法优化概率神经网络PNN的分类预测 - 附代码 文章目录 基于材料生成算法优化概率神经网络PNN的分类预测 - 附代码1.PNN网络概述2.变压器故障诊街系统相关背景2.1 模型建立 3.基于材料生成优化的PNN网络5.测试结果6.参考文献7.Matlab代码 摘要&#xff1a;针对PNN神…

【Java 进阶篇】从Java对象到JSON:Jackson的魔法之旅

在现代的软件开发中&#xff0c;处理数据的能力是至关重要的。而当我们谈及数据格式时&#xff0c;JSON&#xff08;JavaScript Object Notation&#xff09;通常是首选。为了在Java中轻松地将对象转换为JSON&#xff0c;我们需要一种强大而灵活的工具。这时&#xff0c;Jackso…

蓝桥杯物联网_STM32L071_2_继电器控制

CubeMX配置&#xff1a; Function.c及Function.h&#xff1a; #include "Function.h" #include "gpio.h" void Function_LD5_ON(void){HAL_GPIO_WritePin(LD5_GPIO_Port, LD5_Pin, GPIO_PIN_RESET); }void Function_LD5_OFF(void){HAL_GPIO_WritePin(LD5_…

redis五种基本数据类型

redis存储任何类型的数据都是以key-value形式保存&#xff0c;并且所有的key都是字符串&#xff0c;所以讨论基础数据结构都是基于value的数据类型 常见的5种数据类型是&#xff1a;String、List、Set、Zset、Hash 一) 字符串(String) String是redis最基本的类型&#xff0c;v…

3d标签云实现过程(tagcloud.js)同步原生和 vue

写在前面 本来是没有准备写这个知识点&#xff0c;但是下载这个 js 的时候发现很多都是要钱或者是积分的&#xff0c;我就不明白了一个开源了这么久的 js 怎么还有人拿来挣钱的&#xff0c;同时还有一些只有原生 html 的例子&#xff0c;但是现在都是 框架主导的一些项目&#…

【代码随想录】算法训练计划30

【代码随想录】算法训练计划30 1、51. N 皇后 按照国际象棋的规则&#xff0c;皇后可以攻击与之处在同一行或同一列或同一斜线上的棋子。 n 皇后问题 研究的是如何将 n 个皇后放置在 nn 的棋盘上&#xff0c;并且使皇后彼此之间不能相互攻击。 给你一个整数 n &#xff0c;…

为UE和Unity开发者准备的Godot指南

为UE和Unity开发者准备的Godot指南 ——两位大哥打架&#xff0c;请带上我 这两天游戏行业又开始热闹了&#xff0c;昨天两条信息直接刷爆朋友圈&#xff0c;最大的两家游戏引擎公司怼起来了。 《为Unity开发者准备的虚幻引擎指南》&#xff1a; 为Unity开发者准备的虚幻引擎指…

【UE】用样条线实现测距功能(上)

目录 效果 步骤 一、创建样条网格体组件3D模型 二、实现点击连线功能 三、实现显示两点间距离功能 效果 步骤 一、创建样条网格体组件3D模型 创建一个圆柱模型&#xff0c;这里底面半径设置为10mm&#xff0c;高度设置为1000mm 注意该模型的坐标轴在如下位置&#xff1…

智能导视电子指路牌是什么?

SVIP-3800系列智能电子指路牌也称智慧指路灯杆&#xff0c;智能指路牌&#xff0c;导航立柱&#xff0c;多功能指示牌&#xff0c;多功能路标&#xff0c;智能指路机器人&#xff0c;智能导视指路牌&#xff0c;问路导航机器人&#xff0c;智能路牌&#xff0c;叁仟智慧路牌、智…

Vue3常用操作

一、Vue3项目构建 1、安装最新版本vue npm create vuelatest 2、选择需要的配置 3、进入项目 cd 项目名称 4、下载依赖 npm install 5、启动项目 npm run dev