【初始RabbitMQ】工作队列的实现

工作队列

工作队列(又称为任务队列)的主要思想是避免立即执行资源密集型任务,而不得不等待它完成。 相反我们安排任务在之后执行。我们把任务封装为消息并将其发送到队列。在后台运行的工作进 程将弹出任务并最终执行作业。当有多个工作线程时,这些工作线程将一起处理这些任务

轮训分发消息

我们启动两个工作线程,一个消息发送线程,一个用来接受线程,我们来看看它们两个工作线程是如何工作的

抽取工具类

我们将获取信道这个重复的代码封装为一个类,当时用的时候直接调用

/*** 连接工厂创建信道工具类*/
public class RabbitMqUtils {public static Channel getChannel(){ConnectionFactory factory = new ConnectionFactory();factory.setHost("118.31.6.132");factory.setUsername("admin");factory.setPassword("123");Connection connection = null;try {try {connection = factory.newConnection();} catch (IOException e) {e.printStackTrace();}} catch (TimeoutException e) {e.printStackTrace();}Channel channel = null;try {channel = connection.createChannel();} catch (IOException e) {e.printStackTrace();}return channel;}
}

这里使用try...catch防止之后每次调用都需要抛出异常

生产者代码

/*** 生产者 发送大量消息*/
public class Task01 {//队列名称public static final String QUEUE_NAME = "hello";//发送大量的消息public static void main(String[] args) throws IOException, TimeoutException {Channel channel = RabbitMqUtils.getChannel();/**生成一个队列* 1.队列名称* 2.队列里面的信息是否持久化(磁盘)默认情况时在内存* 3.该队列是否只供一个消费者进行消费 是否消费共享 true是允许* 4.是否自动删除 最后一个消费者断开连接之后 该队列是否自动删除 true自动删除 false不自动删除* 5.其他参数 延迟消息等*/channel.queueDeclare(QUEUE_NAME,false,false,false,null);//从控制台接受消息Scanner scanner = new Scanner(System.in);/*** 发送一个消息* 1.发送到那个交换机* 2.路由的KEY值是哪个 本次是队列的名称* 3.其他参数信息* 4.发送消息的消息体*/while(scanner.hasNext()){String message = scanner.next();channel.basicPublish("",QUEUE_NAME,null,message.getBytes());System.out.println("发送"+message+"完成");}}
}

消费者代码

/*** 这是一个工作线程(消费者)*/
public class Worker01 {//队列名称public static final String QUEUE_NAME = "hello";public static void main(String[] args) throws IOException, TimeoutException {Channel channel = RabbitMqUtils.getChannel();//消息接受DeliverCallback deliverCallback = (consumerTag,message)->{System.out.println(new String(message.getBody()));};//消息接受被取消时CancelCallback cancelCallback = (consumerTag)->{System.out.println(consumerTag+"消息取消消费接口回调");};/*** 消费者信息* 1.消费哪个队列* 2.消费成功之后是否要自动应答 true自动应答 false手动应答* 3.消费者微车才能更改消费的回调* 4.消费者取消消费回调*/System.out.println("C2等待接受消息......................");channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);}
}

启动两个工作线程

在运行之前我们要修改一个选项,这样我们就不需要重复的写消费者2了

启动一个发送线程 

启动程序,用生产者发送四条消息,

结果分析

通过程序执行发现生产者总共发送 4 个消息,消费者 1 和消费者 2 分别分得两个消息,并且 是按照有序的一个接收一次消息

 消息应答

消费者完成一个任务可能需要一段时间,如果其中一个消费者处理一个长的任务并仅只完成 了部分突然它挂掉了,会发生什么情况。RabbitMQ 一旦向消费者传递了一条消息,便立即将该消 息标记为删除。在这种情况下,突然有个消费者挂掉了,我们将丢失正在处理的消息。以及后续 发送给该消费这的消息,因为它无法接收到。 为了保证消息在发送过程中不丢失,rabbitmq 引入消息应答机制,消息应答就是:消费者在接收到消息并且处理该消息之后,告诉 rabbitmq 它已经处理了,rabbitmq 可以把该消息删除了

自动应答

消息发送后立即被认为已经传送成功,这种模式需要在高吞吐量和数据传输安全性方面做权衡,因为这种模式如果消息在接收到之前,消费者那边出现连接或者 channel 关闭,那么消息就丢 失了,当然另一方面这种模式消费者那边可以传递过载的消息,没有对传递的消息数量进行限制,当然这样有可能使得消费者这边由于接收太多还来不及处理的消息,导致这些消息的积压,最终使得内存耗尽,最终这些消费者线程被操作系统杀死,所以这种模式仅适用在消费者可以高效并以某种速率能够处理这些消息的情况下使用

消息应答的方法

  1. Channel.basicAck(用于肯定确认)
    1. RabbitMQ 已知道该消息并且成功的处理消息,可以将其丢弃了
  2. Channel.basicNack(用于否定确认)
  3. Channel.basicReject(用于否定确认)
    1. 与 Channel.basicNack 相比少一个参数(批量应答参数) 不处理该消息了直接拒绝,可以将其丢弃了

Multiple(批量应答)的解释

手动应答的一个好处就是可以批量应答并且减少网络拥堵

multiple 的 true 和 false 代表不同意思:

true:代表批量应答channel 上未应答的消息,比如说 channel 上有传送 tag 的消息 5,6,7,8 当前 tag 是 8 那么此时 5-8 的这些还未应答的消息都会被确认收到消息应答

false:同上面相比 只会应答 tag=8 的消息 5,6,7 这三个消息依然不会被确认收到消息应答

消息自动重新入队

如果消费者由于某些原因失去连接(其通道已关闭,连接已关闭或 TCP 连接丢失),导致消息 未发送 ACK 确认,RabbitMQ 将了解到消息未完全处理,并将对其重新排队。如果此时其他消费者 可以处理,它将很快将其重新分发给另一个消费者。这样,即使某个消费者偶尔死亡,也可以确 保不会丢失任何消息

消息手动应答代码

默认消息采用的是自动应答,所以我们要想实现消息消费过程中不丢失,需要把自动应答改 为手动应答,消费者在上面代码的基础上增加下面画红色部分代码

睡眠工具类:

public class SleepUtils {public static void sleep(int second){try {Thread.sleep(1000*second);} catch (InterruptedException e) {e.printStackTrace();}}
}

生产者:

/*
* 消息再手动应答不丢失、放回消息队列重新消费*/
public class Task2 {//队列名称public static final String TASK_QUEUE_NAME = "ack_queue";public static void main(String[] args) throws IOException, TimeoutException {Channel channel = RabbitMqUtils.getChannel();//声明队列channel.queueDeclare(TASK_QUEUE_NAME,true,false,false,null);//从控制台中输入信息Scanner scanner = new Scanner(System.in);while(scanner.hasNext()){String message = scanner.next();channel.basicPublish("",TASK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes(StandardCharsets.UTF_8));System.out.println("生产者发出消息:"+message);}}
}

消费者01:

public class Work01 {private static final String ACK_QUEUE_NAME = "ack_queue";public static void main(String[] args) throws IOException {Channel channel = RabbitMqUtils.getChannel();System.out.println("C1等待接收消息短");//消息消费的时候如何处置消息DeliverCallback deliverCallback = (consumerTag,delivery)->{String message = new String(delivery.getBody());SleepUtils.sleep(1);System.out.println("接收到消息:"+message);/*** 1.消息标记tag* 2.是否批量应答未应答的消息*/channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);};//取消消息的回调CancelCallback cancelCallback = consumerTag->{System.out.println("消息消费被中断");};/*** 消费者信息* 1.消费哪个队列* 2.消费成功之后是否要自动应答 true自动应答 false手动应答* 3.消费者微车才能更改消费的回调* 4.消费者取消消费回调*/boolean autoAck = false;channel.basicConsume(ACK_QUEUE_NAME,autoAck,deliverCallback,cancelCallback);}
}

消费者02:

public class Work02 {private static final String ACK_QUEUE_NAME = "ack_queue";public static void main(String[] args) throws IOException {Channel channel = RabbitMqUtils.getChannel();System.out.println("C2等待接收消息长");//消息消费的时候如何处置消息DeliverCallback deliverCallback = (consumerTag,delivery)->{String message = new String(delivery.getBody());SleepUtils.sleep(30);System.out.println("接收到消息:"+message);/*** 1.消息标记tag* 2.是否批量应答未应答的消息*/channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);};//取消消息的回调CancelCallback cancelCallback = consumerTag->{System.out.println("消息消费被中断");};/*** 消费者信息* 1.消费哪个队列* 2.消费成功之后是否要自动应答 true自动应答 false手动应答* 3.消费者微车才能更改消费的回调* 4.消费者取消消费回调*/boolean autoAck = false;channel.basicConsume(ACK_QUEUE_NAME,autoAck,deliverCallback,cancelCallback);}
}

手动应答效果演示

在发送者发送消息 dd,发出消息之后的把 C2 消费者停掉,按理说该 C2 来处理该消息,但是 由于它处理时间较长,在还未处理完,也就是说 C2 还没有执行 ack 代码的时候,C2 被停掉了, 此时会看到消息被 C1 接收到了,说明消息 dd 被重新入队,然后分配给能处理消息的 C1 处理了

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/473279.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

吴恩达机器学习全课程笔记第一篇

目录 前言 P1 - P8 监督学习 ​无监督学习 P9-P14 线性回归模型 成本(代价)函数 P15-P20 梯度下降 P21-P24 多类特征 向量化 多元线性回归的梯度下降 P25-P30 特征缩放 检查梯度下降是否收敛 学习率的选择 特征工程 多项式回归 前言…

【计算机网络】DNS 因特网的目录服务

标识主机 主机名 hostname 虽然简单易记忆,但是不能提供更详细的信息IP地址 IP address 由四个字节组成DNS 提供的服务 因为主机名和IP地址面向对象的不同,所以需要提供一个服务为其转换,在这个背景下,DNS (Domain Nam…

【C++ STL】你真的了解string吗?浅谈string的底层实现

文章目录 底层结构概述扩容机制浅拷贝与深拷贝插入和删除的效率浅谈VS和g的优化总结 底层结构概述 string可以帮助我们很好地管理字符串,但是你真的了解她吗?事实上,string的设计是非常复杂的,拥有上百个接口,但最常用…

BulingBuling - 《工作中的焦虑》 [ Anxiety at Work ]

工作中的焦虑 帮助团队建立复原力、处理不确定性和完成任务的8项策略 作者:阿德里安-戈斯蒂克、切斯特-埃尔顿和安东尼-戈斯蒂克 Anxiety at Work 8 Strategies to Help Teams Build Resilience, Handle Uncertainty, and Get Stuff Done By Adrian Gostick and…

[超分辨率重建]ESRGAN算法训练自己的数据集过程

一、下载数据集及项目包 1. 数据集 1.1 文件夹框架的介绍,如下图所示:主要有train和val,分别有高清(HR)和低清(LR)的图像。 1.2 原图先通过分割尺寸的脚本先将数据集图片处理成两个相同的图像…

BUGKU-WEB game1

题目描述 题目截图如下: 进入场景看看: 是一个盖楼的游戏! 解题思路 先看看源码,好像没发现什么特别的是不是要得到一定的分数才会有对应的flag?查看下F12,请求链接发现,这不就提示了 相…

工业数据采集的时间不确定性及PLC-Recorder的通道偏移功能

目录 一、缘起 二、效果展示 三、设置方法 四、小结 一、缘起 大家都知道采集软件首先要尽可能还原数据原来的状态,给用户提供一个可以信赖的参考。但是,数据采集又有很多随机因素:Windows是一个周期不严格的系统、以太网通讯有时间波动、…

【开源】SpringBoot框架开发智能教学资源库系统

目录 一、摘要1.1 项目介绍1.2 项目录屏 二、功能模块2.1 数据中心模块2.2 课程档案模块2.3 课程资源模块2.4 课程作业模块2.5 课程评价模块 三、系统设计3.1 用例设计3.2 数据库设计3.2.1 课程档案表3.2.2 课程资源表3.2.3 课程作业表3.2.4 课程评价表 四、系统展示五、核心代…

RM电控工程讲义

HAL_CAN_RxFifo0MsgPendingCallback(CAN_HandleTypeDef *hcan) 是一个回调函数,通常在STM32的HAL库中用于处理CAN(Controller Area Network)接收FIFO 0中的消息。当CAN接口在FIFO 0中有待处理的消息时,这个函数会被调用。 HAL库C…

centos中docker操作+安装配置django+mysql5.7并使用simpleui美化管理后台

一、安装docker 确保系统是CentOS 7并且内核版本高于3.10,可以通过uname -r命令查看内核版本。 更新系统软件包到最新版本,可以使用命令yum update -y。 安装必要的软件包,包括yum-utils、device-mapper-persistent-data和lvm2。使用命令yum install -y yum-utils devic…

蓝桥杯 星期计算

思路1 由于2022太大,用double来存储,即(52022 % 7) % 7即可 int num 5;int t (int)(Math.pow(20,22)%7);num t;num%7;System.out.println(num1);思路2 你需要知道 (a * b ) % p a % p * b % p Scanner scan new Scanner(System.in);int num 1;for…

C语言第二十六弹---字符串函数(下)

✨个人主页: 熬夜学编程的小林 💗系列专栏: 【C语言详解】 【数据结构详解】 目录 1、strncat 函数的使用 2、strncmp 函数的使用 3、strstr 函数的使用和模拟实现 4、strtok 函数的使用 5、strerror 函数的使用 6、perror 函数的使用…