RabbitMQ---work消息模型

1、work消息模型

工作队列或者竞争消费者模式
在这里插入图片描述

在第一篇教程中,我们编写了一个程序,从一个命名队列中发送并接受消息。在这里,我们将创建一个工作队列,在多个工作者之间分配耗时任务。
工作队列,又称任务队列。主要思想就是避免执行资源密集型任务时,必须等待它执行完成。相反我们稍后完成任务,我们将任务封装为消息并将其发送到队列。 在后台运行的工作进程将获取任务并最终执行作业。当你运行许多消费者时,任务将在他们之间共享,但是一个消息只能被一个消费者获取。
这个概念在Web应用程序中特别有用,因为在短的HTTP请求窗口中无法处理复杂的任务。
接下来我们来模拟这个流程:

o P:生产者:任务的发布者
o C1:消费者,领取任务并且完成任务,假设完成速度较快
o C2:消费者2:领取任务并完成任务,假设完成速度慢

面试题:避免消息堆积?

1)采用workqueue,多个消费者监听同一队列。
2)接收到消息以后,而是通过线程池,异步消费。

1.1、生产者

生产者与案例1中的几乎一样:

public class Send {private final static String QUEUE_NAME = "test_work_queue";public static void main(String[] argv) throws Exception {// 获取到连接Connection connection = ConnectionUtil.getConnection();// 获取通道Channel channel = connection.createChannel();// 声明队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 循环发布任务for (int i = 0; i < 50; i++) {// 消息内容String message = "task .. " + i;channel.basicPublish("", QUEUE_NAME, null, message.getBytes());System.out.println(" [x] Sent '" + message + "'");}// 关闭通道和连接channel.close();connection.close();}
}

不过这里我们是循环发送50条消息。

1.2、消费者1

// 消费者1
public class Recv {private final static String QUEUE_NAME = "test_work_queue";public static void main(String[] argv) throws Exception {// 获取到连接Connection connection = ConnectionUtil.getConnection();// 获取通道final Channel channel = connection.createChannel();// 声明队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 定义队列的消费者DefaultConsumer consumer = new DefaultConsumer(channel) {// 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,byte[] body) throws IOException {// body 即消息体String msg = new String(body);System.out.println(" [消费者1] received : " + msg + "!");try {// 模拟完成任务的耗时:1000msThread.sleep(1000);} catch (InterruptedException e) {}// 手动ACKchannel.basicAck(envelope.getDeliveryTag(), false);}};// 监听队列。channel.basicConsume(QUEUE_NAME, false, consumer);}
}

1.3、消费者2

//消费者2
public class Recv2 {private final static String QUEUE_NAME = "test_work_queue";public static void main(String[] argv) throws Exception {// 获取到连接Connection connection = ConnectionUtil.getConnection();// 获取通道final Channel channel = connection.createChannel();// 声明队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 定义队列的消费者DefaultConsumer consumer = new DefaultConsumer(channel) {// 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,byte[] body) throws IOException {// body 即消息体String msg = new String(body);System.out.println(" [消费者2] received : " + msg + "!");try {// 模拟完成任务的耗时:200msThread.sleep(200);} catch (InterruptedException e) {}// 手动ACKchannel.basicAck(envelope.getDeliveryTag(), false);}};// 监听队列。channel.basicConsume(QUEUE_NAME, false, consumer);}
}

与消费者1基本类似,就是没有设置消费耗时时间。
这里是模拟有些消费者快,有些比较慢。
接下来,两个消费者一同启动,然后发送50条消息:
在这里插入图片描述
在这里插入图片描述

可以发现,两个消费者各自消费了25条消息,而且各不相同,这就实现了任务的分发。

1.4、能者多劳

• 刚才的实现有问题吗?
o 消费者1比消费者2的效率要低,一次任务的耗时较长
o 然而两人最终消费的消息数量是一样的
o 消费者2大量时间处于空闲状态,消费者1一直忙碌
• 现在的状态属于是把任务平均分配,正确的做法应该是消费越快的人,消费的越多。
• 怎么实现呢?
o 我们可以使用basicQos方法和prefetchCount = 1设置。
o 这告诉RabbitMQ一次不要向工作人员发送多于一条消息。
o 或者换句话说,不要向工作人员发送新消息,直到它处理并确认了前一个消息。
o 相反,它会将其分派给不是仍然忙碌的下一个工作人员。
在这里插入图片描述

再次测试:

  ![在这里插入图片描述](https://img-blog.csdnimg.cn/a25bdfcd50bf41f9a6e51076560cd15f.png)

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/80679.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

为什么网络互联地址设置为30位地址

对于点对点链路&#xff0c;为了节约IPv4地址&#xff0c;一般为其分配/30地址块&#xff0c;这样包含4个地址&#xff1a;最小地址作为网络地址&#xff0c;最大地址作为广播地址&#xff0c;剩余两个可分配地址&#xff0c;分配给链路两端的接口&#xff0c;这是最普遍的方法…

Pandas学习笔记

Pandas数据分析处理库 数据预处理 导入一份泰坦尼克号乘客数据 df.head()展示读取数据&#xff0c;默认读取前5行 df.tail()默认读取后5行 df.head(10)读取前10行DataFrame结构 Pandas工具包的基础结构&#xff0c;二维矩阵结构&#xff0c;行表示数据样本&#xff0c;列表示…

解决nginx的负载均衡下上传webshell的问题

目录 环境 问题 访问的ip会变动 执行命令的服务器未知 上传大文件损坏 深入内网 解决方案 环境 ps :现在已经拿下服务器了&#xff0c;要解决的是负载均衡问题, 以下是docker环境&#xff1a; 链接: https://pan.baidu.com/s/1cjMfyFbb50NuUtk6JNfXNQ?pwd1aqw 提…

密码学学习笔记(二十一):SHA-256与HMAC、NMAC、KMAC

SHA-256 SHA-2是广泛应用的哈希函数&#xff0c;并且有不同的版本&#xff0c;这篇博客主要介绍SHA-256。 SHA-256算法满足了哈希函数的三个安全属性&#xff1a; 抗第一原像性 - 无法根据哈希函数的输出恢复其对应的输入。抗第二原像性 - 给定一个输入和它的哈希值&#xf…

回归预测 | MATLAB实现DBN-ELM深度置信网络结合极限学习机多输入单输出回归预测

回归预测 | MATLAB实现DBN-ELM深度置信网络结合极限学习机多输入单输出回归预测 目录 回归预测 | MATLAB实现DBN-ELM深度置信网络结合极限学习机多输入单输出回归预测预测效果基本介绍模型描述程序设计参考资料 预测效果 基本介绍 1.MATLAB实现DBN-ELM深度置信网络结合极限学习…

如何深入理解 Node.js 中的流(Streams)

Node.js是一个强大的允许开发人员构建可扩展和高效的应用程序。Node.js的一个关键特性是其内置对流的支持。流是Node.js中的一个基本概念&#xff0c;它能够实现高效的数据处理&#xff0c;特别是在处理大量信息或实时处理数据时。 在本文中&#xff0c;我们将探讨Node.js中的流…

计算机竞赛 基于GRU的 电影评论情感分析 - python 深度学习 情感分类

文章目录 1 前言1.1 项目介绍 2 情感分类介绍3 数据集4 实现4.1 数据预处理4.2 构建网络4.3 训练模型4.4 模型评估4.5 模型预测 5 最后 1 前言 &#x1f525; 优质竞赛项目系列&#xff0c;今天要分享的是 基于GRU的 电影评论情感分析 该项目较为新颖&#xff0c;适合作为竞…

【Cortex-M3权威指南】学习笔记1 - 概览与基础

介绍 三种主流 Cortex 款式 款式 A&#xff1a;设计用于高性能的“开放应用平台” 款式 R&#xff1a;用于高端的嵌入式系统&#xff0c;尤其是那些带有实时要求的 款式 M&#xff1a;用于深度嵌入的&#xff0c;单片机风格的系统中 指令集发展 ARM 处理器一直支持两种形式上…

【Unity自制手册】游戏基础API大全

&#x1f468;‍&#x1f4bb;个人主页&#xff1a;元宇宙-秩沅 &#x1f468;‍&#x1f4bb; hallo 欢迎 点赞&#x1f44d; 收藏⭐ 留言&#x1f4dd; 加关注✅! &#x1f468;‍&#x1f4bb; 本文由 秩沅 原创 &#x1f468;‍&#x1f4bb; 收录于专栏&#xff1a;Uni…

Eduma主题 - 线上教育WordPress主题/网站

Eduma主题 – 线上教育WordPress主题是为教育网站、LMS、培训中心、课程中心、学院、大学、学校、幼儿园而制作的。基于我们使用以前的主题eLearning WP构建WordPress LMS的经验&#xff0c;Education WP是下一代&#xff0c;也是围绕WordPress最好的教育主题之一&#xff0c;它…

清华源的链接太多老崩溃,我把它拷过来,需要什么点什么

建议按照字母分个类可能会好点 把链接这里改为 哈哈就不卡了&#xff0c;浏览器也不崩溃了还能很快就链接成功 Links for pandas这是链接 这个小技巧教给大家请给我点个赞

详细手机代理IP配置

嗨&#xff0c;亲爱的朋友们&#xff01;作为一家代理产品供应商&#xff0c;我知道有很多小伙伴在使用手机进行网络爬虫和数据采集时&#xff0c;常常会遇到一些IP限制的问题。别担心&#xff01;今天我要给大家分享一下手机IP代理的设置方法&#xff0c;让你们轻松应对这些限…