RabbitMQ手动应答与持久化

1.SleepUtil线程睡眠工具类

package com.hong.utils;/*** @Description: 线程睡眠工具类* @Author: hong* @Date: 2023-12-16 23:10* @Version: 1.0**/
public class SleepUtil {public static void sleep(int second) {try {Thread.sleep(1000*second);} catch (InterruptedException e) {Thread.currentThread().interrupt();}}
}

2.消息生产者

package com.hong.rabbitmq3;import com.hong.utils.RabbitMQUtil;
import com.rabbitmq.client.Channel;import java.util.Scanner;/*** @Description: 消息手动应答时不丢失,放回队列重新消费* @Author: hong* @Date: 2023-12-16 22:33* @Version: 1.0**/
public class Task3 {public static final String TASK_QUEUE_NAME = "ack_queue";public static void main(String[] args) throws Exception{Channel channel = RabbitMQUtil.getChannel();channel.queueDeclare(TASK_QUEUE_NAME,false,false,false,null);Scanner scanner = new Scanner(System.in);System.out.println("请输入:");while (scanner.hasNext()){String message = scanner.next();channel.basicPublish("",TASK_QUEUE_NAME,null,message.getBytes("UTF-8"));System.out.println("消息发送完成------" + message);}}
}

3.两个消费者

模拟一个处理速度快(Worker3),另一个处理速度慢(Worker4)

3.1.处理时间短

package com.hong.rabbitmq3;import com.hong.utils.RabbitMQUtil;
import com.hong.utils.SleepUtil;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;/*** @Description: 消息手动应答时不丢失,放回队列重新消费* @Author: hong* @Date: 2023-12-16 23:05* @Version: 1.0**/
public class Worker3 {private static final String TASK_QUEUE_NAME = "ack_queue";public static void main(String[] args) throws Exception{Channel channel = RabbitMQUtil.getChannel();System.out.println("worker3等待接收消息,处理速度快");DeliverCallback deliverCallback = (comsumerTag, message) -> {SleepUtil.sleep(1);System.out.println("接收到的消息:"+  new String(message.getBody(),"UTF-8"));//手动应答/*** 第一个参数:消息标识* 第二个参数是否批量:true批量*/channel.basicAck(message.getEnvelope().getDeliveryTag(),false);};CancelCallback cancelCallback = var -> System.out.println(var + "消息消费被中断!");//手动应答falsechannel.basicConsume(TASK_QUEUE_NAME,false,deliverCallback,cancelCallback);}
}

3.2.处理时间长

package com.hong.rabbitmq3;import com.hong.utils.RabbitMQUtil;
import com.hong.utils.SleepUtil;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;/*** @Description: 消息手动应答时不丢失, 放回队列重新消费* @Author: hong* @Date: 2023-12-16 23:05* @Version: 1.0**/
public class Worker4 {private static final String TASK_QUEUE_NAME = "ack_queue";public static void main(String[] args) throws Exception{Channel channel = RabbitMQUtil.getChannel();System.out.println("worker4等待接收消息,处理速度慢");DeliverCallback deliverCallback = (comsumerTag, message) -> {SleepUtil.sleep(20);System.out.println("接收到的消息:"+  new String(message.getBody(),"UTF-8"));//手动应答/*** 第一个参数:消息标识* 第二个参数是否批量:true批量*/channel.basicAck(message.getEnvelope().getDeliveryTag(),false);};CancelCallback cancelCallback = var -> System.out.println(var + "消息消费被中断!");//手动应答falsechannel.basicConsume(TASK_QUEUE_NAME,false,deliverCallback,cancelCallback);}
}

4.结果

启动生产者后启动2个消费者,等消息bb接收到后,发送cc和dd
在这里插入图片描述
在这里插入图片描述
等Worker4接收到消息bb后将其关闭,发现原本该Worker4消费的消息dd并未丢失,重回队列被Worker3消费
在这里插入图片描述

5.持久化

5.1.队列持久化

package com.hong.rabbitmq4;import com.hong.utils.RabbitMQUtil;
import com.rabbitmq.client.Channel;import java.util.Scanner;/*** @Description: 队列持久化* @Author: hong* @Date: 2023-12-17 22:52* @Version: 1.0**/
public class Task4 {public static final String TASK_QUEUE_NAME = "persist_queue";public static void main(String[] args) throws Exception{Channel channel = RabbitMQUtil.getChannel();//true持久化channel.queueDeclare(TASK_QUEUE_NAME,true,false,false,null);Scanner scanner = new Scanner(System.in);System.out.println("请输入:");while (scanner.hasNext()){String message = scanner.next();channel.basicPublish("",TASK_QUEUE_NAME,null,message.getBytes("UTF-8"));System.out.println("消息发送完成------" + message);}}
}

5.2.消息持久化

package com.hong.rabbitmq4;import com.hong.utils.RabbitMQUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.MessageProperties;import java.util.Scanner;/*** @Description: 队列持久化与消息持久化* @Author: hong* @Date: 2023-12-17 22:52* @Version: 1.0**/
public class Task4 {public static final String TASK_QUEUE_NAME = "persist_queue";public static void main(String[] args) throws Exception{Channel channel = RabbitMQUtil.getChannel();//队列持久化   true持久化channel.queueDeclare(TASK_QUEUE_NAME,true,false,false,null);Scanner scanner = new Scanner(System.in);System.out.println("请输入:");while (scanner.hasNext()){String message = scanner.next();//消息持久化  MessageProperties.PERSISTENT_TEXT_PLAINchannel.basicPublish("",TASK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes("UTF-8"));System.out.println("消息发送完成------" + message);}}
}

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/283139.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Docker 的基本概念、优势、及在程序开发中的应用

Docker 是一种容器化平台,它通过使用容器化技术,将应用程序及其依赖性打包到一个独立的、可移植的容器中,从而实现应用程序的快速部署、可靠性和可扩展性。 下面是 Docker 的一些基本概念和优势: 容器:Docker 使用容器化技术,将应用程序及其依赖性打包到一个可移植的容器…

计算机网络:自顶向下第八版学习指南笔记和课后实验--运输层

记录一些学习计算机网络:自顶向下的学习笔记和心得 Github地址,欢迎star ⭐️⭐️⭐️⭐️⭐️ 运输层 TCP: 传输控制协议 报文段 UDP: 用户数据包协议 数据报 将主机间交付扩展到进程间交付被称为运输层的多路复用与多路分解 将运输层…

华为鸿蒙应用--欢迎页SplashPage+倒计时跳过(自适应手机和平板)-ArkTs

鸿蒙ArkTS 开发欢迎页SplashPage倒计时跳过,可自适应平板和手机: 一、SplashPage.ts import { BreakpointSystem, BreakPointType, Logger, PageConstants, StyleConstants } from ohos/common; import router from ohos.router;Entry Component struct…

饥荒Mod 开发(十):制作一把AOE武器

饥荒Mod 开发(九):物品栏排列 饥荒Mod 开发(十一):修改物品堆叠 前面的文章介绍了很多基础知识以及如何制作一个物品,这次制作一把武器,装备之后可以用来攻击怪物。 制作武器贴图和动画 1.1 制作贴图。 先准备一张武器的贴图&a…

【机器学习】梯度下降法:从底层手写实现线性回归

【机器学习】Building-Linear-Regression-from-Scratch 线性回归 Linear Regression0. 数据的导入与相关预处理0.工具函数1. 批量梯度下降法 Batch Gradient Descent2. 小批量梯度下降法 Mini Batch Gradient Descent(在批量方面进行了改进)3. 自适应梯度…

MATLAB图像处理技巧

MATLAB图片处理------动态绘图 1. 动态绘图2. XXXXX 1. 动态绘图 主要用到四个函数,分别为getframe、frame2im、rgb2ind以及imwrite: 1.getframe:获取当前绘图窗口的图片作为影片帧; 2.frame2im:从单个影片帧 F 返回索…

Eclipse 自动生成注解,如果是IDEA可以参考编译器自带模版进行修改

IDEA添加自动注解 左上角选择 File -> Settings -> Editor -> File and Code Templates&#xff1b; 1、添加class文件自动注解&#xff1a; ​/*** <b>Function: </b> todo* program: ${NAME}* Package: ${PACKAGE_NAME}* author: Jerry* date: ${YEA…

为什么在Android中需要Context?

介绍 在Android开发中&#xff0c;Context是一个非常重要的概念&#xff0c;但是很多开发者可能并不清楚它的真正含义以及为什么需要使用它。本文将详细介绍Context的概念&#xff0c;并解释为什么在Android应用中需要使用它。 Context的来源 Context的概念来源于Android框架…

FPGA设计与实战之时钟及时序简介1

文章目录 一、时钟定义二、基本时序三、总结一、时钟定义 我们目前设计的电路以同步时序电路为主,时钟做为电路工作的基准而显得非常重要。 简单的接口电路比如I2C、SPI等,复杂一点接口比如Ethernet的MII、GMII等接口,它们都有一个或多个时钟信号。 那么什么是时钟信号?它…

【JAVA-Day69】抛出异常的精髓:深度解析 throw、throws 关键字,优雅处理异常问题

抛出异常的精髓&#xff1a;深度解析 throw、throws 关键字&#xff0c;优雅处理异常问题 &#x1f680; 抛出异常的精髓&#xff1a;深度解析 throw、throws 关键字&#xff0c;优雅处理异常问题 &#x1f680;一、什么是抛出异常 &#x1f60a;二、如何抛出异常 &#x1f914…

桌面概率长按键盘无法连续输入问题

问题描述&#xff1a;概率性长按键盘无法连续输入文本 问题定位&#xff1a; 系统按键流程分析 图一 系统按键流程 按键是由X Server接收的&#xff0c;这一点只要明白了X Window的工作机制就不难理解了。X Server在接收到按键后&#xff0c;会转发到相应程序的窗口中。在窗…

将mjpg格式数转化成opencv Mat格式

该博客可以解决如下两个问题&#xff1a; 1、将mjpg格式数据转化成opencv Mat格式 2、v4l2_buffer 格式获取的mjpg格式数据转换成Mat格式。 要将 MJPEG 格式的数据转换为 OpenCV 的 Mat 格式&#xff0c;您可以使用 imdecode 函数。imdecode 函数可以将图像数据解码为 Mat 对象…