RabbitMq应用延时消息

一.建立绑定关系

package com.lx.mq.bind;import com.lx.constant.MonitorEventConst;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;
import java.util.Map;/*** @author liuweiping.com* @version 1.0* @date 2023-06-26 10:04:03*/
@Slf4j
@Configuration
public class MonitorRabbitMqBinding {@Value(value = "-${spring.profiles.active}")private String profile;/*** Description: 延迟消息 <br/>* Created By: liu wei ping <br/>* Creation Time: 2023年6月26日 下午6:59:43 <br/>* <br/>* @return <br/>*/@Bean("delayExchange")public CustomExchange buildDelayedMessageNoticeExchange(){Map<String, Object> args = new HashMap<>();args.put("x-delayed-type", "direct");return new CustomExchange(MonitorEventConst.MONITOR_DELAYED_MESSAGE_EXCHANGE + profile, "x-delayed-message", Boolean.FALSE, Boolean.FALSE, args);}@Beanpublic Queue buildDelayedMessageNoticeQueue(){return QueueBuilder.durable(MonitorEventConst.MONITOR_DELAYED_MESSAGE_QUEUE + profile).build();}@Beanpublic Binding buildDelayedMessageNoticeBinding(){return BindingBuilder.bind(buildDelayedMessageNoticeQueue()).to(buildDelayedMessageNoticeExchange()).with(MonitorEventConst.MONITOR_DELAYED_MESSAGE_ROUTING_KEY).noargs();}/*** 交车完成事件消息定时处理队列*/@Beanpublic Queue deliveryCompleteEventHandQueue() {return QueueBuilder.durable(MonitorEventConst.DELIVERY_COMPLETE_DELAYED_QUEUE + profile).build();}/*** 交车完成事件消息定时处理队列绑定*/@Beanpublic Binding deliveryCompleteBinding() {return BindingBuilder.bind(deliveryCompleteEventHandQueue()).to(buildDelayedMessageNoticeExchange()).with(MonitorEventConst.DELIVERY_COMPLETE_DELAYED_ROUTING_KEY).noargs();}}

二.建立生产者

1.消息实体

package com.lx.dto.monitor;import lombok.Data;import java.util.Date;/*** @author liuweiping.com* @version 1.0* @date 2023-06-26 10:11:06*/
@Data
public class MonitorEventMessage {/*** 事件id*/private String eventId;/*** 事件编码*/private String eventCode;/*** 业务数据*/private String businessUniqueKey;/*** 业务类型*/private String businessType;/*** 到期时间*/private Long expireMillis;/***  时间处理唯一版本号*/private Integer eventHandVersion;/*** 定时处理时间*/private Date timedOperationTime;public void setTimedOperationTime(Date timedOperationTime) {this.timedOperationTime = timedOperationTime;expireMillis = timedOperationTime.getTime() - new Date().getTime();if (expireMillis < 0) {expireMillis = 0L;}}
}
package com.lx.mq.producer;import com.lx.constant.MonitorEventConst;
import com.lx.designPattern.strategypattern.workorderbase.service.sysField.JsonUtil;
import com.lx.dto.monitor.MonitorEventMessage;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;/*** 监控事件消息发送类*/
@Slf4j
@Component
public class MonitorEventMessageProducer {@Value(value = "-${spring.profiles.active}")private String profile;@Autowiredprivate RabbitTemplate rabbitTemplate;/***  交车完成监控事件定时发送*/public void sendDeliveryCompleteEventHandMessage(MonitorEventMessage monitorEventMessage) {String message = JsonUtil.toJson(monitorEventMessage);;rabbitTemplate.convertAndSend(MonitorEventConst.MONITOR_DELAYED_MESSAGE_EXCHANGE + profile,MonitorEventConst.DELIVERY_COMPLETE_DELAYED_ROUTING_KEY,message,msg -> {msg.getMessageProperties().setDelay(monitorEventMessage.getExpireMillis().intValue());return msg;});log.info("sending event processing messages: {}", message);//发送事件处理消息}}

三.建立消费者

package com.lx.mq.consumer;import com.lx.constant.MonitorEventConst;
import com.lx.designPattern.strategypattern.workorderbase.service.sysField.JsonUtil;
import com.lx.dto.monitor.MonitorEventMessage;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;import java.nio.charset.StandardCharsets;/*** 监控事件消息发送类*/
@Slf4j
@Component
public class MonitorEventMessageConsumer {@Value(value = "-${spring.profiles.active}")private String profile;/***  交车完成事件处理mq监听*/@RabbitListener(queues = MonitorEventConst.DELIVERY_COMPLETE_DELAYED_QUEUE + "-${spring.profiles.active}")public void dealWithDeliveryCompleteEventHandMessage(String eventMessage, Channel channel, Message message) {log.info("dealWithDeliveryCompleteEventHandMessage:【{}】", JsonUtil.toJson(eventMessage));String str = new String(message.getBody(), StandardCharsets.UTF_8);log.info("Received the message of regular loading and unloading of goods: {}", str); //收到商品定时上下架消息MonitorEventMessage monitorEventMessage = JsonUtil.toBean(eventMessage, MonitorEventMessage.class);try {analyzeHand(monitorEventMessage);}catch (Exception e){log.error("交车完成事件分析失败,参数:{},e:{}",JsonUtil.toJson(monitorEventMessage),JsonUtil.toJson(e));}}/***  事件分析* @param monitorEventMessage*/private void  analyzeHand(MonitorEventMessage monitorEventMessage) throws Exception {}
}

四.测试类测试

package com.lx.controller;import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;import com.lx.conf.MQConfig;
import com.lx.conmon.ResultData;
import com.lx.dto.monitor.MonitorEventMessage;
import com.lx.mq.producer.MonitorEventMessageProducer;
import com.lx.utils.DateUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;import com.lx.constant.RedisMange;
import com.lx.utils.RedisUtil;
import org.thymeleaf.util.DateUtils;/*** @Author : liu wei ping* @CreateTime : 2019/9/3* @Description :**/@RestController
public class SendMessageController {@Autowiredprivate MonitorEventMessageProducer messageProducer;@GetMapping("/sendTopicMessage3")public ResultData<String> sendTopicMessage3() {MonitorEventMessage monitorEventMessage = new MonitorEventMessage();monitorEventMessage.setEventCode("delivery");//设置定时处理时间= 当前时间+ 定时处理时长monitorEventMessage.setTimedOperationTime(DateUtil.date(DateUtil.getCurrentMillis() + 30 * 1000));monitorEventMessage.setBusinessType("deliveryType");messageProducer.sendDeliveryCompleteEventHandMessage(monitorEventMessage);return new ResultData<>("ok");}
}

五.效果如图所示

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/4913.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【计算机网络】可靠传输的实现机制

参考视频 https://www.bilibili.com/video/BV1c4411d7jb 1、停止-等待协议SW (Stop-and-Wait) 1.1 信道利用率 1.2 题目 1.3 小结 2.回退N帧协议GBN (Go-Back-N) 1.1 题目 1.2 小结 3.选择重传协议SR (Selective-Repeat) 3.1 过程 3.2 发送窗口 和 接收窗口尺寸范围 4.小结 5.…

MATLAB 之 Simulink系统的仿真与分析

这里写目录标题 一、Simulink 系统的仿真与分析1. 设置仿真参数1.1 Solver 参数设置1.2 Data lmport/Export 参数设置 2. 运行仿真与仿真结果分析2.1 运行仿真2.2 仿真结果分析 一、Simulink 系统的仿真与分析 系统的模型建立之后&#xff0c;选择仿真参数和数值算法&#xff…

ModaHub魔搭社区:向量数据库Milvus性能调优教程(一)

目录 性能调优 插入性能调优 查询性能调优 硬件环境 系统参数 性能调优 插入性能调优 “数据插入”到“数据写入磁盘”的基本流程请参考 存储操作。 如果数据量小于单次插入上限&#xff08;256 MB&#xff09;&#xff0c;批量插入比单条插入要高效得多。 系统配置中…

three.js点材质(PointsMaterial)常用属性设置

一、前景回顾 上一章节简单介绍了下怎么使用点材质和点对象创建物体点对象和点材质介绍 点材质和点对象基本运用示例代码&#xff1a; import * as THREE from "three"; // 导入轨道控制器 import { OrbitControls } from "three/examples/jsm/controls/Orbit…

沉浸式三维虚拟展厅交互体验科技感十足

随着科技的不断发展进步&#xff0c;展厅的表现形式也变得多样化&#xff0c;紧跟时代发展步伐&#xff0c;迭代创新。 3D虚拟展厅具有四大优势 一、降低成本&#xff0c;提高效率 3D“VR线上展厅”将艺术优势资源转到线上搭建的艺术线上展平台&#xff0c;相对传统艺术展来说有…

基于redis的bitmap实现签到功能(后端)

项目环境 MacOS springboot: 2.7.12 JDK 11 maven 3.8.6 redis 7.0.11 StringRedisTemplate 的key和value默认都是String类型 可以避免不用写配置类&#xff0c;定义key和value的序列化。 实现逻辑&#xff1a; 获取用户登录信息 根据日期获取当天是多少号 构建…

Cetos7.x连接不上网络解决办法

Cetos7.x连接不上网络解决办法 Cetos7.x连接不上网络解决办法 在VM中设置网络连接为桥接&#xff0c;修改后仍无法连接网络 ##配置centos7中ens33&#xff0c;将默认的no修改为yes 启动CentOS系统&#xff0c;并打开一个连接终端会话&#xff0c;使用root登录&#xff1b;进…

Vue 时间格式转换

文章目录 将秒转换成简单时间格式方式一 表格渲染方式二 js转换 将时间转换为字符串方式一 年、月、日、时、分、秒、星期等信息方式二 返回多久之前的时间 将秒转换成简单时间格式 方式一 表格渲染 element-ui 表格为例&#xff0c;duration 单位为秒 <el-table-column …

Css设置border从中间向两边的颜色渐进效果

Css设置border从中间向两边的颜色渐进效果 .list-item {border-bottom: 1rpx solid;border-image: linear-gradient(to right, rgba(0,0,0,.1) 0%, rgba(81, 110, 197, 0.76) 50%, rgba(0,0,0,.1) 100%) 1;}效果如图&#xff1a;

5.MySQL索引事务

文章目录 &#x1f43e;1. 索引&#x1f43e;&#x1f490;1.1 概念&#x1f490;&#x1f338;1.2 作用与缺点&#x1f338;&#x1f337;1.2.1作用&#x1f337;&#x1f340;1.2.2缺点&#x1f340; &#x1f339;1.3 使用场景&#x1f339;&#x1f33b;1.4 使用&#x1f3…

新项目即将启动!小灰做个市场调研

熟悉小灰的小伙伴们都知道&#xff0c;在2019年初&#xff0c;做了整整10年程序员的小灰离开职场&#xff0c;成为了一名自由职业者。 2021年末&#xff0c;小灰注册了自己的公司&#xff0c;名为北京小灰大黄科技有限公司。 公司虽然注册了&#xff0c;但是整个公司只有小灰一…

Java注解

一、什么是注解 注解是放在Java源码的类、字段、方法、参数前的一种特殊的“注释”&#xff0c;和普通注释的区别是&#xff0c;普通注释被编译器直接忽略&#xff0c;注解则可以被编译器打包进入Class文件。如下图所示就是lombok中的一些注解。 注解的作用&#xff1a; 从JVM角…