RabbitMq 入门应用 提升性能 : 算法多阶段并行 (Python)

大问题: 我们有一个算法,它可以被分为多个阶段进行(顺序不可颠倒),每个阶段的性能和资源要求不同(且不均衡程度比较高);

假设我们现在可以堆资源(较多的CPU和内存),如何将算法各个步骤拆分并进行性能均衡和实现,使得算法性能最大化以满足生产要求?

多进程:

由于算法有严格的顺序要求,如果是针对视频、自然语言等前后关联较强的数据,一次算法运行只能顺序处理一个问题;

比如一段视频只能读入并处理,多进程并不解决单个进程处理缓慢的问题;

同时Python自带的多进程数据共享堪称地狱(Manager)比较难以使用;

多线程:

多线程的数据共享度高(内存上并不分离),如果并行很容易出现争抢关键数据的情况,各种锁导致性能下降严重;

因此我们的选择的方法是异步+中间件(RabbitMq)

单纯地异步可以吗?但是Python/Java等语言天生对异步的支持就很差,想提升性能?想都别想!

我们只能像上面这样把算法拆开,在关键的节点处堆性能,减少算法从输入到输出的时延,同时避免单个进程导致的资源请求冲突问题。

那么怎么很方便地使用queue传递数据呢?中间件或者TCP只能传递bytes类型的数据?

对于Python而言有个非常好的方案:Pickle import pickle, pickle.dumps(),pickle.loads() 就足以解决大部分问题(而且足够快!),我们就能得到某些数据的bytes信息,然后直接把它丢进队列就好啦!

注意,对于某些指针对象,如高维 numpy.ndarray 的情形,如果它被封装在字典(dict) 或者 链表 (list) 里面,pickle 很可能只会序列化指针的地址(彻彻底底地浅拷贝),导致数据丢失,需要特别注意!

有关Rabbitmq我们需要掌握的技能

由于本篇Blog侧重工程开发和快速入门,我们只需要知道RabbitMq能够帮我们准确无误的传递消息(bytes);

RabbitMq在Linux上的安装 https://www.rabbitmq.com/docs/install-debian 里面这一块(无脑执行就可以了)

之后的启动,命令行输入: rabbitmq-server /etc/rabbitmq/rabbitmq-env.conf

/etc/rabbitmq/rabbitmq-env.conf 是默认的配置文件的位置,简单的操作我们不需要修改配置文件;

一些常用的操作:命令行输入

查看现在的队列的情况 (能看到现在队列里面消息的情况)

rabbitmqctl list_queues

删除队列

rabbitmqctl delete_queue $Your_queue_name

清空队列消息

rabbitmqctl purge_queue $Your_queue_name

详细内容请参考:

https://www.cnblogs.com/thomas-fan/p/15888556.html (比较齐全)

之后建议去看一看 HelloWorld和第一个工作模式(Worker),不过大部分的参数都不需要改动就能满足简单的需求

Python 使用

首先安装pika pip inistall pika 就可以

其实我们只要学会Hello World里面的内容就能做很多事情了!

newTask.py

#!/usr/bin/env python
import pika
import sysconnection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()channel.queue_declare(queue='task_queue', durable=True)message = ' '.join(sys.argv[1:]) or "Hello World!"
channel.basic_publish(exchange='',routing_key='task_queue',body=message,properties=pika.BasicProperties(delivery_mode=pika.DeliveryMode.Persistent))
print(f" [x] Sent {message}")
connection.close()

worker.py

#!/usr/bin/env python
import pika
import timeconnection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()channel.queue_declare(queue='task_queue', durable=True)
print(' [*] Waiting for messages. To exit press CTRL+C')def callback(ch, method, properties, body):print(f" [x] Received {body.decode()}")time.sleep(body.count(b'.'))print(" [x] Done")ch.basic_ack(delivery_tag=method.delivery_tag)channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue='task_queue',on_message_callback=callback)channel.start_consuming()

这两个程序运行起来(其中 worker.py 多跑几个,然后不停地运行 newTask.py 发送带有 '.' 的消息 如 '123...' 你就能观察到队列的运行啦(多个进程分别处理消息)!)

程序中重点语句:

channel.basic_consume(queue='task_queue',on_message_callback=callback) 监听队列,当有消息产生的时候调用 callback函数! ‘task_queue’是监听的队列

channel.basic_publish(exchange='',routing_key='task_queue',body=message,properties=pika.BasicProperties(delivery_mode=pika.DeliveryMode.Persistent))

发消息,往 'task_queue' 里面发

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()channel.queue_declare(queue='task_queue', durable=True)

启动和注册队列;

更加进阶的知识:

https://www.rabbitmq.com/tutorials/tutorial-two-python

https://www.cnblogs.com/guyuyun/p/14970592.html

总结:

例子中的消息是持久化的,消息和队列若不被处理和删除就会一直存在于队列当中

对于大部分的上手的调试工作,清除队列消息再重启或者在保留消息的情形下进入消息处理程序进行Debug都是非常方便且常用的方法!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/804098.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Github使用技巧

1、怎么查找对应关系 一般查找对应关系,可以去阅读下面的说明: 2、0.20.Release版本不存在的问题如上图:根本不知道0.2.1.RELEASE对应的dubbo-spring-boot-start版本是多少,在maven仓库中根本没有这个版本 解释:这里需要我们点击这个版本,然后就会跳转到这个代码上,如…

微信小程序修改radio颜色

看效果: 代码:<radio-group @change="onRadioChange"><label><radio value="同意" style="margin-right: 30rpx">同意</radio></label><label><radio value="不同意">不同意</radio&g…

springboot对接dubbo遇到的巨坑

1、添加配置jar包<dependency><groupId>org.apache.dubbo</groupId><artifactId>dubbo-spring-boot-starter</artifactId><version>2.7.4.1</version></dependency><dependency><groupId>org.apache.dubbo</gr…

一款高性价比4g工业路由器,配置简单,网络覆盖强

​各位老铁,今天给大家介绍一下SR600这款工业级4G路由器。这玩意儿是专门为工业环境设计的,比如说化工厂、矿场这种恶劣环境,普通路由器根本扛不住。先说说SR600的硬件配置:处理器: 工业级CPU,厂家没透露具体型号,但性能够用内存: 128MB,满足工业控制需求存储: 16MB Flash,可扩…

VS Code 的SSH连接不成功问题分析与解决

问题描述:多次输入密码,一直连接不上 解决方法; 打开远程服务器中~/.vscode-server/bin/xxx文件夹,此时可以看到一个名为vscode-server.tar.gz,截图如下:上面的37开头的文件夹称为Commit Id,现在利用Commit ID下载远程连接需要的文件。使用这个链接: https://update.cod…

「FJWC2020Day5-zzq」rng 题解

题意简述 一个长度为 \(n\) 的实数序列 \(a_i\),其中 \(a_i\) 为 \([l_i, r_i]\) 中独立均匀随机选取的实数。你只能通过交换相邻两个数,使得 \(a_i\) 单调不降。你需要求出你最少操作次数的期望,对 \(M = 998244353\) 取模。 \(1 \leq n \leq 10^6\),\(0 \leq l_i \lt r_i…

wpf ToggleButton选中效果和一个登录界面

先看效果 我修改了ToggleButton的ControlTemplate,在ContentPresenter外面加了4个Border,控制4个Border的位置在ControlTemplate的左上、右上、左下、右下,选中时,触发4个边框的BorderThickness <Setter Property="BorderThickness&qu…

WindowSystemEvent

Qt中为WindowSystemEvent事件定义了处理函数Handler,通过宏定义和模版来声明定义 ---- QT_DEFINE_QPA_EVENT_HANDLER Matches (25 in 1 files) ---- qwindowsysteminterface.cpp (gui\kernel) line 199 : #define QT_DEFINE_QPA_EVENT_HANDLER(ReturnType, HandlerName, ...) …

基础数据结构之递归

递归 1) 概述 定义 计算机科学中,递归是一种解决计算问题的方法,其中解决方案取决于同一类问题的更小子集In computer science, recursion is a method of solving a computational problem where the solution depends on solutions to smaller instances of the same probl…

JAVA的数组基本用法

array 在声明数组变量时,需要指出数组类型和数组变量名,例如int[] a;不过这条语句只是声明了变量a,并没有将a初始化为一个真正的数组。应该使用new操作符来创建数组。 int[] a = int[100]或者var a = new int[100]数组长度不要求是常数 但是一旦创建了数组,就不能再改变它的…

Unity DatePicker用法,实现UI的日期/时间选择器功能

前言 用Unity3d做一个类似于选时间段,查询数据并展示统计UI的功能 插件 https://assetstore.unity.com/packages/tools/gui/datepicker-for-unityui-68264 样例效果 弹出日期选择器时间范围选择器包含类型 Shared Calendar 共享的日历,这个就是几个选择器共用一个日历来选择时…

正三棱台

正三棱台的相关知识点前情概要 正三棱台的概念,表面积,体积公式,引申 动态演示结合上图,你能说出正三棱台的各部分的名称吗?你能自行画出正三棱台的图形吗? 典例剖析 【2025届高三质检一试题】已知正三棱台 \(ABC-A_1B_1C_1\) 的上底面积为 \(\sqrt{3}\),下底面积为 \(4…