SpringBoot整合Canal+RabbitMQ监听数据变更(对rabbit进行模块封装)

SpringBoot+Canal(监听MySQL的binlog)+RabbitMQ(处理保存变更记录)

在SpringBoot中采用一种与业务代码解耦合的方式,来实现数据的变更记录,记录的内容是新数据,如果是更新操作还得有旧数据内容。
使用Canal来监听MySQL的binlog变化可以实现这个需求,可是在监听到变化后需要马上保存变更记录,除非再做一些逻辑处理,于是又结合了RabbitMQ来处理保存变更记录的操作。

  • 启动MySQL环境,并开启binlog
  • 启动Canal环境,为其创建一个MySQL账号,然后以Slave的形式连接MySQL
  • Canal服务模式设为TCP,用Java编写客户端代码,监听MySQL的binlog修改
  • Canal服务模式设为RabbitMQ,启动RabbitMQ环境,配置Canal和RabbitMQ的连接,用消息队列去接收binlog修改事件

预先在model实体中准备

短信实体

@Data
@ApiModel(description = "短信实体")
public class MsmVo{@ApiModelProperty(value="phone")private String phone;@ApiModelProperty(value = "短信模板code")private  String templateCode;@ApiModelProperty(value="短信模板参数")private Map<String,Object> param;
}

排班实体

@Data
@ApiModel(description = "OrderMqVo")
public class OrderMqVo{@ApiModelProperty(value="可预约数")private Integer reserverdNumber@ApiModelProperty(value = "剩余预约数")private Integer availableNumber;@ApiModelProperty(value = "排班id")private String scheduleId;@ApiModelProperty(value = "短信实体")private MsmVo msmVo;
}

一、安装RabbitMQ

docker pull rabbitmq:nanagemnet
docker run -d -p 5672:5672 -p 12672:15672 --name rabbitmq rabbitmq:nanagement

访问:http://IP:15672
在这里插入图片描述

二、rabbit-util模块封装

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-bus-amqp</artifactId>
</dependency>
<dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId>
</dependency>

创建一个RabbitService用来发送消息

@Service
public class RabbitService{@Autowiredprivate RabbitTemplate rabbitTemplate;//发送消息public boolean sendMessage(String exchange,String routingKey,Object message){rabbitTemplate.convertAndSend(exchange,routingKey,message);return true;}
}

创建mq消息转化器

@Configuration
public class MQConfig{@Beanpublic MessageConverter messageConverter(){return new Jackson2JsonMessageConverter();}
}

添加常量配置类

public class MqConst{//预约下单public static final String EXCHANGE_DIRECT_ORDER = "exchange.direct.order";public static final String ROUTING_ORDER = "order";//队列public static final String QUEUE_ORDER = "queue.order";//短信public static final String EXCHANGE_DIRECT_MSM = "exchange.direct.msm";public static final String ROUTING_MSM_ITEM = "msm.item";pulib static final String Queue_MSM_item = "queue.msm.item";
}

三、短信模块service-sms

将二中的模块依赖引入

<dependency><groupId>com.michael</groupId><artifactId>rabbit_util</artifactId><version>xxx</version>
</dependency>

配置文件application.properties

spring.rabbitmq.host=192.168.44.168
spring.rabbitmq.port=5672
spring.rabbit.uername=guest
spring.rabbitmq.password=guest

Service发送消息

public interface MsmService{//发送手机验证码boolean send(String phone,String code);//MQ使用发送短信的接口boolean send(MsmVo msmVo);
}
@Service
public class MsmServiceImpl implements MsmService{@Overridepublic boolean send(String phone,String code){//判断手机号是否为空if(StringUtils.isEmpty(phone)){return false;}//整合阿里云相关参数,短信服务DefaultProfile profile = DefaultProfile.getProfile(ConstantPropertiesUtils.REGION_Id,ConstantPropertiesUtils.ACCESS_KEY_ID,ConstantPropertiesUtils.SECRET);IAcsClient client = new DefaultAcsClient(profile);CommonRequest request = new CommonRequest();request.setMethod(MethodType.POST);request.setDomain("dysmsapi.aliyuncs.com");request.setVersion("2018-08-08");request.setAction("SendSms");//手机号request.putQueryParameter("PhoneNumbers",phone);//签名名称request.putQueryParameter("SignName","我的网站");//模板request.putQueryParameter("TemplateCode","SMS_180051135");//验证码使用json格式{"code":"123456"}Map<String,Object> param = new HashMap();param.put("code",code);request.putQueryParameter("TemplateParam",JSONObject.toJSONString(param));//调用方法进行短信发送try{CommonResponse response = client.getCommonResponse(request);System.out.println(response.getData());return response.getHttpResponse().isSuccess();}catch(ServerException e){e.printStackTrace();}catch(ClientException e){e.printStackTrace();}return false;}@Overridepublic boolean send(MsmVo msmVo){if(!StringUtils.isEmpty(msmVO.getPhone())){String code = (String)msmVo.getParam().get("code");boolean isSend = this.send(msmVo.getPhone(),code);return isSend;}return false;}
}

创建mq监控器

@Component
public class MsmReceiver{@Autowiredprivate MsmService msmService;//监听@RabbitListener(bindings = @QueueBinding(value = @Queue(value = MqConst.QUEUE_MSM_ITEM,durable = "true"),exchange = @Exchange(value = MqConst.EXCHANGE_DIRECT_MSM),key = {MqConst.ROUTING_MSM_ITEM}))public void send(MsmVo msmVo,Message message,Channel channel){msmService.ssend(msmVo);}
}

四、业务类

生成订单之后,发送短信并更新数量

①、业务模块中引入依赖

rabbit-util

②、添加配置

spring.rabbitmq.host=192.168.44.165
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest

③、service接口以及实现类

@Override
public void update(Schedule schedule){schedule.setUpdata(new Date());scheduleRepository.save(schedule);
}

④、receiver包中创建MQ监听器

@Component
public class HospitalReceiver{@Autowiredprivate ScheduleService scheduleService;@Autowiredprivate RabbitService rabbitService;//监听@RabbitListener(bindings = @QueueBinding(value = @Queue(value = MqConst.QUEUE_ORDER,durable = "true"),exchange = @Exchange(value = MqConst.EXCHANGE_DIRECT_ORDER),key = {MqConst.ROUTING_ORDER}))public void receiver(OrderMqVo orderMqVo,Message message,Channel channle) throws IOException{//下单成功,更新数据Schedule schedule = scheduleService.getScheduleId(orderMqVo.getScheduleId());schedule.setReservedNumber(orderMqVo.getReservedNumber());schedule.setAvailableNumber(orderMqVo.getAvailableNumber);scheduleService.update(schedule);//发送短信MsmVo msmVo = orderMqVo.getMsmVo();if(null != msmVo){rebbitService.sendMessage(MqConst.QUEUE_MSM_ITEM,MqConst.ROUTING_MSM_ITEM,msmVo);}}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/165172.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Go进阶之rpc和grpc

文章目录 Go环境安装1&#xff09;windows2&#xff09;linux go语言编码规范1.1 包名&#xff1a;package1.2 ⽂件名1.3 结构体命名1.4 接⼝命名1.5 变量命名1.6 常量命名2.1 包注释2.2 结构&#xff08;接⼝&#xff09;注释2.3 函数&#xff08;⽅法&#xff09;注释2.4 代码…

四川芸鹰蓬飞:抖音短视频运营是做什么的?

抖音短视频作为一种新兴的社交媒体平台&#xff0c;它的运营团队肩负着将用户需求与平台资源相结合&#xff0c;促使平台发展壮大的重要任务。抖音短视频运营旨在通过精准的用户分析和有针对性的内容推送&#xff0c;提高用户留存和活跃度&#xff0c;增加广告收入&#xff0c;…

vue3怎么获取el-form的元素节点

在元素中使用ref设置名称 在ts中通过从element-plus引入formInstance,设置formRef同名名称字段来获取el-form节点

网页制作-引入icon

1.如何引入icon 1.1 进入https://www.iconfont.cn/ 1.2 登录或者注册一下 1.3 在搜索框输入你想搜索的内容 1.4 加入购物车 1.5 在购物车中点击下载代码 1.6 若是普通的html项目&#xff0c;则至需要将如下两个拷贝到你的项目中 1.7 在你需要的网页中引入iconfont.css就可以使…

【Proteus仿真】【Arduino单片机】数码管显示

文章目录 一、功能简介二、软件设计三、实验现象联系作者 一、功能简介 本项目使用Proteus8仿真Arduino单片机控制器&#xff0c;使用TM1637、共阳数码管等。 主要功能&#xff1a; 系统运行后&#xff0c;数码管显示数字、字符。 二、软件设计 /* 作者&#xff1a;嗨小易&am…

利用QT画图像的直方图

1.什么是直方图 直方图是一种图形化展示数据频率分布的方式。它将样本数据分成一系列相邻的区间&#xff0c;统计每个区间内数据所占比例或数量&#xff0c;并用矩形条形图表现出来。直方图可以反映样本数据的分布情况&#xff0c;例如它们的集中趋势、对称性和离散程度等。 …

【JAVA学习笔记】66 - 本章作业(IO流)

项目代码 https://github.com/yinhai1114/Java_Learning_Code/tree/main/IDEA_Chapter19/src/com/yinhai/homework 1.使用File类和FileWriter类 (1)在判断e盘下是否有文件夹mytemp&#xff0c;如果没有就创建mytemp public class Homework01 {public static void main(String…

小程序 打开方式 页面效果 表单页面 点击跳到详情页 图标 获取后台数据 进行页面渲染

请求地址&#xff1a;geecg-uniapp 同源策略 数据请求 获取后台数据 ui库安装 冲突解决&#xff08;3&#xff09;-CSDN博客 一.uniapp转小程序 (1) 运行微信开发工具 &#xff08;2&#xff09; 配置id 然后运行 打开小程序 路径 E:\通\uniapp-jeecg\unpackage\dist\d…

yum工具的使用

yum工具的使用 rpm的弊端 前面我们讲了下rpm&#xff0c;那么rpm有什么弊端呢&#xff1f;其弊端是显而易见的&#xff0c;当用rpm安装软件时&#xff0c;若遇到有依赖关系的软件&#xff0c;必须先安装依赖的软件才能继续安装我们要安装的软件&#xff0c;当依赖关系很复杂的…

ci-cd的流程

1、项目在gitlab上&#xff0c;从gitlab上使用git插件获取源码&#xff0c;构建成war包&#xff0c;所以使用tomcat作为运行环境 发布 &#xff1a;使用maven插件发布&#xff0c;使用ssh连接。

Bun 1.0.7 版本发布,实现多个 Node.js 兼容改进

导读Bun 是一个集打包工具、转译器和包管理器于一体的 JavaScript 运行时&#xff0c;由 Jarred Sumner 发布了 1.0.7 版本。本次更新实现了对 Node.js 运行时的多项兼容性改进&#xff0c;并修复了近 60 个 bug。 根据发布说明&#xff0c;本版本对 “bun install” 命令进行…

尚硅谷大数据项目《在线教育之实时数仓》笔记007

视频地址&#xff1a;尚硅谷大数据项目《在线教育之实时数仓》_哔哩哔哩_bilibili 目录 第9章 数仓开发之DWD层 P053 P054 P055 P056 P057 P058 P059 P060 P061 P062 P063 P064 P065 第9章 数仓开发之DWD层 P053 9.6 用户域用户注册事务事实表 9.6.1 主要任务 读…