通过rabbitmq生成延时消息,并生成rabbitmq镜像

通过rabbitmq生成延时消息队列,并生成rabbitmq镜像

  • 整体描述
    • 1. 使用场景
    • 2. 目前问题
    • 3. 前期准备
  • 具体步骤
    • 1. 拉取镜像
    • 2. 运行镜像
    • 3. 安装插件
    • 4. 代码支持
      • 4.1 config文件
      • 4.2 消费监听
      • 4.2 消息生产
    • 5. 功能测试
  • 镜像操作
    • 1. 镜像制作
    • 2. 镜像导入
  • 总结

整体描述

1. 使用场景

在使用消息队列时,我们有时候需要生成一些延时消息,比如判断一个任务的开始时间,我在创建任务的时候计算出此时距离任务开始的时间,然后往消息队列里发送一个延时消息,我们希望等到任务开始的时候,再消费此消息,此时任务开始,可以进行一些业务上的操作。

2. 目前问题

之前写过一篇创建rabbitmq镜像的文章,链接: 在centos搭建rabbitmq并制作docker镜像,使用的rabbitmq的版本是3.6.8,只能通过过期时间expiration来设置消息的过期时间,在消息过期的时候,会进入死信队列中,也能达到上述要求。但是,但是,这个过期时间expiration,rabbitmq在处理的时候有个坑,前面消息如果没有过期,后面的消息就算过期了,也不会触发,就是先发的消息没有到期,之后再发的消息就算到期了,也不会触发回调。这显然不行。

3. 前期准备

需要准备的主要就是docker环境,这个可以自行搜一下怎么安装docker环境,由于和本文主要讲的内容关系不大,略…

具体步骤

为了解决此问题,我们可以用延时队列插件来实现,这个插件时一个开发者写的,在github上但是已经被rabbitmq官方接受了,所以可以放心用。

1. 拉取镜像

首先我们先拉取一个rabbitmq的官方镜像进行操作,这个需要注意一下拉取的版本,由于延时队列的插件支持的版本是3.7之后的rabbitmq,所以需要拉取3.7之后的,我这拉取的是3.8.17版本。在命令行输入:

docker pull rabbitmq:3.8.17-management

注:这个如果报错,看下自己的docker环境有没有问题。带management是带管理页面的镜像,我们选用的带management的镜像,后期使用的时候好操作和定位问题。

2. 运行镜像

拉取成功之后,使用命令:

docker images

查看镜像是否拉取成功,如下就是成功了:
rabbitmq镜像
之前用的3.6.8的,不支持延时消息队列的插件…
然后运行镜像,创建容器并启动:

docker run --name rabbitmq-server -p 5672:5672 -p 15672:15672 -d rabbitmq:3.8.17-management

此时用:

docker ps -a

查看容器:
rabbitmq容器
容器已经创建并启动,我们通过web页面可以访问rabbitmq的管理页面,在浏览器输入:http://localhost:15672/
默认账号:guest,密码:guest
rabbitmq登录页面

3. 安装插件

此时rabbitmq已经运行,我们需要安装插件来支持延时消息队列,插件下载地址
选择相应的rabbitmq版本进行下载,注意版本不要选错了。下载完是一个rabbitmq_delayed_message_exchange-3.8.0.ez的文件,我们需要把这个文件上传到docker的/opt/rabbitmq/plugins目录下。
上传之后,进入/opt/rabbitmq/sbin目录执行如下命令让插件生效:

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

执行之后看到如下就成功了:
插件启动成功
成功之后刷新一下管理页面,在新建交换机那里,type能多一个x-delayed-message的选项:
添加延时交换机
此时,我们的rabbitmq就配置完成了。

4. 代码支持

rabbitmq目前已经可以接收延时消息了,在代码端我们也需要进行相应的修改,以达到发送延时消息的目的。

4.1 config文件

package com.thcb.rabbitmq.config;import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;
import java.util.Map;/*** RabbitMQ的配置类** @author thcb* @date 2023-09-05*/
@Configuration
public class RabbitMqConfig {// 交换机private static final String DELAYED_EXCHANGE = "delayed.exchange";// 队列private static final String DELAYED_QUEUE = "delayed.queue";// 路由private static final String DELAYED_ROUTING_KEY = "delayed.routingKey";/*** 队列*/@Beanpublic Queue delayedQueue() {return new Queue(DELAYED_QUEUE);}/*** 交换机*/@Beanpublic CustomExchange delayedExchange() {Map<String, Object> args = new HashMap<>();args.put("x-delayed-type", "direct");return new CustomExchange(DELAYED_EXCHANGE, "x-delayed-message", false, false, args);}/*** 绑定延迟队列和交换机*/@Beanpublic Binding delayQueueBindingDelayExchange() {return BindingBuilder.bind(delayedQueue()).to(delayedExchange()).with(DELAYED_ROUTING_KEY).noargs();}}

4.2 消费监听

package com.thcb.rabbitmq.recevier;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.util.Date;/*** 消费监听** @author thcb* @date 2023-09-05*/
@Slf4j
@Component
public class DelayQueueReceiver {@RabbitListener(queues = "delayed.queue")public void receiveDelayedQueue(Message message) {String msg = new String(message.getBody());log.info("当前时间:{},收到DelayedQueue消息:{}", new Date().toString(), msg);}}

4.2 消息生产

这里创建一个controller来生产消息,里面有两个接口,一个生产消息的延时时间是5秒,另一个是30秒,用来测试延时时间。

package com.thcb.rabbitmq.controller;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.bind.annotation.RestController;import java.util.Date;/*** 消息生产controller** @author thcb* @date 2023-09-05*/
@RestController
@RequestMapping("/HelloController")
public class HelloController {private static final Logger log = LoggerFactory.getLogger(HelloController.class);@Autowiredprivate AmqpTemplate rabbitTemplate;@RequestMapping("/sendXDLMessage1")@ResponseBodypublic String sendXDLMessage1() {int time = 5000;String message = "{\"type\":\"sendXDLMessage1\"}";log.info("当前时间:{},发送一条延迟{}毫秒的信息给延迟队列:{}", new Date().toString(), time, message);rabbitTemplate.convertAndSend("delayed.exchange", "delayed.routingKey", message, msg -> {msg.getMessageProperties().setDelay(time);return msg;});return "sendXDLMessage1 success";}@RequestMapping("/sendXDLMessage2")@ResponseBodypublic String sendXDLMessage2() {int time = 30000;String message = "{\"type\":\"sendXDLMessage2\"}";log.info("当前时间:{},发送一条延迟{}毫秒的信息给延迟队列:{}", new Date().toString(), time, message);rabbitTemplate.convertAndSend("delayed.exchange", "delayed.routingKey", message, msg -> {msg.getMessageProperties().setDelay(time);return msg;});return "sendXDLMessage2 success";}
}

5. 功能测试

代码修改完,就可以测试了,启动工程之后,在rabbitmq管理页面能看到自动创建了如下交换机和队列:
创建的交换机
创建的队列
可以看到交换机的类型是x-delayed-message。
接下来就可以调用测试接口,生产2条消息看看了。先调用sendXDLMessage2接口,生产一个延时30秒的消息,过一会再调用sendXDLMessage1的接口,生产一个延时5秒的消息。log结果如下:
运行结果
结果符合我们的预期,先发的30秒延时消息消息2,之后发的5秒延时消息1,然后过了5秒消息1先回调,之后30秒消息2回调。

镜像操作

使用docker主要就是要制作镜像,之后直接就可以用了要不每次还得配置。提示制作之前,把现在的队列和交换机都删除,队列和交换机是通过代码创建的,账号可以换一个,默认的guest不太安全。
一切都准备就绪,就可以制作镜像了。

1. 镜像制作

将镜像打包成tar文件。

docker commit 【镜像id】 rabbitmq:3.8.17
docker save -o rabbitmq-3.8.17.tar rabbitmq:3.8.17

2. 镜像导入

制作完镜像进行导入

docker load <rabbitmq-3.8.17.tar
docker run -d -p 5672:5672 -p 15672:15672 --privileged --restart=always --name rabbitmq rabbitmq:3.8.17

总结

以上就是rabbitmq延时消息的相关内容,另外这个延时消息在消息很多的情况下可能会有一些性能问题,使用的时候需要注意一下。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/99063.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

vue集成mars3d后,basemaps加不上去

首先&#xff1a; <template> <div id"centerDiv" class"mapcontainer"> <mars-map :url"configUrl" οnlοad"onMapload" /> </div> </template> <script> import MarsMap from ../component…

Vue + Element UI 前端篇(三):工具模块封装

Vue Element UI 实现权限管理系统 前端篇&#xff08;三&#xff09;&#xff1a;工具模块封装 封装 axios 模块 封装背景 使用axios发起一个请求是比较简单的事情&#xff0c;但是axios没有进行封装复用&#xff0c;项目越来越大&#xff0c;会引起越来越多的代码冗余&am…

Kotlin(五) 循环语句

目录 For循环 关键字 until step downTo Java中主要有两种循环语句&#xff1a;while循环和for循环。而Kotlin也提供了while循环和for循环&#xff0c;其中while循环不管是在语法还是使用技巧上都和Java中的while循环没有任何区别&#xff0c;因此我们就直接跳过不进行讲解…

小白学go基础06-了解切片实现原理并高效使用

slice&#xff0c;中文多译为切片&#xff0c;是Go语言在数组之上提供的一个重要的抽象数据类型。在Go语言中&#xff0c;对于绝大多数需要使用数组的场合&#xff0c;切片实现了完美替代。并且和数组相比&#xff0c;切片提供了更灵活、更高效的数据序列访问接口。 切片究竟是…

项目管理工具:实现项目科学管理的利器

什么是项目管理工具 项目管理工具是指用于协助规划、组织、执行和监控项目活动的软件或应用程序。它们提供了一系列功能和工具&#xff0c;帮助项目管理人员和团队有效地管理项目进度、资源分配、任务协作以及沟通等方面的工作。项目管理工具的目标是提高团队的工作效率、优化…

浅谈Mysql读写分离的坑以及应对的方案 | 京东云技术团队

一、主从架构 为什么我们要进行读写分离&#xff1f;个人觉得还是业务发展到一定的规模&#xff0c;驱动技术架构的改革&#xff0c;读写分离可以减轻单台服务器的压力&#xff0c;将读请求和写请求分流到不同的服务器&#xff0c;分摊单台服务的负载&#xff0c;提高可用性&a…

vue 浏览器记住密码后,自动填充账号密码错位

亲测有效&#xff01;&#xff01;! 遇到的场景&#xff1a; 浏览器记住密码后&#xff0c;登录时自动填充账号密码&#xff0c;由于登录时只需要这两个字段所以没问题&#xff0c;见图一&#xff0c;但注册时&#xff0c;账号密码不在一处&#xff0c;见图二 原本账号应该在…

windows编程之线程同步万字总结(创建线程,互斥对象,互斥事件,信号量,关键段,多线程群聊服务器)

文章目录 创建线程方法一_beginthreadex函数讲解使用示例&#xff1a; 方法二CreateThread函数讲解:使用示例: 互斥对象:创建互斥对象CreateMutex 互斥事件介绍创建或打开一个未命名的互斥事件对象 信号量介绍信号量的相关函数使用示例 关键段相关函数错误使用示例正确使用示例…

如何为 Flutter 应用程序创建环境变量

我们为什么需要环境变量&#xff1f; 主要用于存储高级机密数据&#xff0c;如果泄露可能会危及您产品的安全性。这些变量本地存储在每个用户的本地系统中&#xff0c;不应该签入存储库。每个用户都有这些变量的副本。 配置 在根项目中创建一个名为 .env 的文件夹&#xff08…

『PyQt5-Qt Designer篇』| 08 Qt Designer中容器布局和绝对布局的使用

08 Qt Designer中容器布局和绝对布局的使用 1 容器布局1.1 设计容器布局1.2 保存文件并执行2 绝对布局2.1 设计绝对布局2.2 保存文件并执行1 容器布局 1.1 设计容器布局 先拖入一个容器Frame容器,然后拖入几个控件: 把拖入的控件拖入容器中: 选中容器,右键-布局-栅格布局:…

基于OpenCV+LPR模型端对端智能车牌识别——深度学习和目标检测算法应用(含Python+Andriod全部工程源码)+CCPD数据集

目录 前言总体设计系统整体结构图系统流程图 运行环境Python 环境OpenCV环境Android环境1. 开发软件和开发包2. JDK设置3. NDK设置 模块实现1. 数据预处理2. 模型训练1&#xff09;训练级联分类器2&#xff09;训练无分割车牌字符识别模型 3. APP构建1&#xff09;导入OpenCV库…

把一般数据转换成因子数据格式,做单因子、债券对历史数据回测+获取curl命令+垃圾数据转换成标准行情数据(bardata)

下载curl软件&#xff0c;地址&#xff1a; curl for Windows for 64-bit下载好后解压到文件夹&#xff0c;将里面的bin文件添加到环境变量中&#xff0c;bon文件地址为&#xff1a;C:\Users\59980\curl-8.2.1_7-win64-mingw\bin 打开cmd&#xff0c;输入curl --help,出现下…