RabbitMQ的安装使用

RabbitMQ是什么?

MQ全称为Message Queue,消息队列,在程序之间发送消息来通信,而不是通过彼此调用通信。
RabbitMQ 主要是为了实现系统之间的双向解耦而实现的。当生产者大量产生数据时,消费者无法快速消费,那么需要一个中间层。保存这个数据。

为什么使用RabbitMQ?

优点:
1、实现应用系统的解耦,客户端只关心发送消息,而不关心处理。
2、异步提升效率,在主业务逻辑发送消息,异步去处理消息
3、流量削峰,将请求放到mq消息队列中,mysql每秒去拉取请求消费,避免请求全部一下子全部打到mysql,请求过多而崩溃

怎么使用RabbitMQ?

1.安装windows的客户端,参考链接3

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

2.java 代码引入相关jar包
		<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.amqp</groupId><artifactId>spring-amqp</artifactId><version>2.2.3.RELEASE</version></dependency>
3.编写发送,接收消息的工具类
延迟队列配置
package com.next.mq;import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** @desc 延迟队列配置*/
@Configuration
public class RabbitDelayMqConfig {@Bean("delayDirectExchange")public DirectExchange delayDirectExchange() {DirectExchange directExchange = new DirectExchange(QueueConstants.DELAY_EXCHANGE, true, false);//交换机开启延迟设置true,延迟才会生效directExchange.setDelayed(true);return directExchange;}@Bean("delayNotifyQueue")public Queue delayNotifyQueue() {return new Queue(QueueConstants.DELAY_QUEUE);}@Bean("delayBindingNotify")public Binding delayBindingNotify(@Qualifier("delayDirectExchange") DirectExchange delayDirectExchange,@Qualifier("delayNotifyQueue") Queue delayNotifyQueue) {return BindingBuilder.bind(delayNotifyQueue).to(delayDirectExchange).with(QueueConstants.DELAY_ROUTING);}
}
队列配置
package com.next.mq;import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;/*** @desc 队列配置*/
@Configuration
public class RabbitMqConfig {@Bean("directExchange")@Primarypublic DirectExchange directExchange() {return new DirectExchange(QueueConstants.COMMON_EXCHANGE, true, false);}@Bean("notifyQueue")@Primarypublic Queue notifyQueue() {return new Queue(QueueConstants.COMMON_QUEUE);}@Bean("bindingNotify")@Primarypublic Binding bindingNotify(@Qualifier("directExchange") DirectExchange directExchange,@Qualifier("notifyQueue") Queue notifyQueue) {return BindingBuilder.bind(notifyQueue).to(directExchange).with(QueueConstants.COMMON_ROUTING);}
}
发送消息工具类
package com.next.mq;import com.next.util.JsonMapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;import javax.annotation.Resource;
import java.util.UUID;/*** @desc 客户端工具类 -- 发送消息*/
@Component
@Slf4j
public class RabbitMqClient {@Resourceprivate RabbitTemplate rabbitTemplate;//发送同步消息public void send(MessageBody messageBody) {try {//生成唯一的消息idString uuid = UUID.randomUUID().toString();//初始话消息CorrelationData correlationData = new CorrelationData(uuid);//使用模板工具类rabbitTemplate 来发消息rabbitTemplate.convertAndSend(QueueConstants.COMMON_EXCHANGE, QueueConstants.COMMON_ROUTING,JsonMapper.obj2String(messageBody), new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {// 消息持久化message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);//记录日志log.info("message send, {}", message);return message;}}, correlationData);} catch (Exception e) {//日志打印,以便定位问题log.error("message send exception, msg:{}", messageBody.toString(), e);}}/*** @desc 发送延迟消息*/public void sendDelay(MessageBody messageBody, int delayMillSeconds) {try {//设置消息延迟时间messageBody.setDelay(delayMillSeconds);String uuid = UUID.randomUUID().toString();CorrelationData correlationData = new CorrelationData(uuid);//延迟交换机和路由rabbitTemplate.convertAndSend(QueueConstants.DELAY_EXCHANGE, QueueConstants.DELAY_ROUTING,JsonMapper.obj2String(messageBody), new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);// 消息持久化//设置消息延迟的时间(毫秒值)message.getMessageProperties().setDelay(delayMillSeconds);log.info("delay message send, {}", message);return message;}}, correlationData);} catch (Exception e) {log.error("delay message send exception, msg:{}", messageBody.toString(), e);}}
}
接收消息工具类
package com.next.mq;import com.next.dto.RollbackSeatDto;
import com.next.model.TrainOrder;
import com.next.service.TrainOrderService;
import com.next.service.TrainSeatService;
import com.next.util.JsonMapper;
import lombok.extern.slf4j.Slf4j;
import org.codehaus.jackson.type.TypeReference;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import javax.annotation.Resource;/*** @desc rabbitmq的server端 - 延迟接收消息* 用处:在主流程里面发送消息,异步流程里面接收消息,处理。提升代码性能*/@Component
@Slf4j
public class RabbitDelayMqServer {@Resourceprivate TrainSeatService trainSeatService;@Resourceprivate TrainOrderService trainOrderService;@RabbitListener(queues = QueueConstants.DELAY_QUEUE)public void receive(String message) {log.info("delay queue receive message, {}", message);try {MessageBody messageBody = JsonMapper.string2Obj(message, new TypeReference<MessageBody>() {});if (messageBody == null) {return;}switch (messageBody.getTopic()) {case QueueTopic.SEAT_PLACE_ROLLBACK:RollbackSeatDto dto = JsonMapper.string2Obj(messageBody.getDetail(), new TypeReference<RollbackSeatDto>() {});trainSeatService.batchRollbackSeat(dto.getTrainSeat(), dto.getFromStationIdList(), messageBody.getDelay());break;case QueueTopic.ORDER_PAY_DELAY_CHECK:TrainOrder trainOrder = JsonMapper.string2Obj(messageBody.getDetail(), new TypeReference<TrainOrder>() {});trainOrderService.delayCheckOrder(trainOrder);break;default:log.warn("delay queue receive message, {}, no need handle", message);}} catch (Exception e) {log.error("delay queue message handle exception, msg:{}", message, e);}}
}

参考链接:
1.rabbitMQ到底是个啥东西?
2.超详细!!!Windows下安装RabbitMQ的步骤详解
3.windows安装rabbitmq和环境erlang(最详细版,包括对应关系,安装错误解决方法)
4.RabbitMQ安装或启动后,无法访问http://localhost:15672/

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/421882.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

基于SpringBoot的SSM整合案例

项目目录: 数据库表以及表结构 user表结构 user_info表结构 引入依赖 父模块依赖: <parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.2.12.RELEASE</version>…

Conmi的正确答案——eclipse C/C++显示“未解析的包含:<xxx.h>”/“Unresolved inclusion: <xxx.h>”

eclipse IDE 版本&#xff1a;2023-12 部分采自&#xff1a;解决方法&#xff1a;关于问题 “C - Unresolved inclusion: <iostream>” 解释事项&#xff1a;方法一可能版本不同&#xff0c;部分界面修改了。这里使用的是方法二的解决方法。&#xff08;或者各位大神的描…

最新多线程版 FFmpeg 剖析

FFmpeg近期推出了一个重要Feature&#xff0c;即将原来的 FFmpeg 命令行工具由单线程变成了多线程。 ffmpeg -i input.mp4 -c:v libx264 -crf 23 out.mp4 如上面的命令&#xff0c;以前使用上面命令进行转码时&#xff0c;由于它是单线程工作模式&#xff0c;因此只能利用一个…

写一个简单的python服务测试

1&#xff1a;引用sanic pip install sanic 2: 引用sanic 跨域 3&#xff1a; #-*- coding: UTF-8 -*- # !/usr/bin/python # time :2022/2/22 21:43 # author :Mo # function :get/post of sanicfrom sanic.response import json, text from sanic import Sanic, req…

vs2022配置OpenCV测试

1&#xff0c;下载Opencv安装包 OpenCV官网下载地址&#xff1a;Releases - OpenCV 大家可以按需选择版本进行下载&#xff0c;官网下载速度还是比较慢的&#xff0c;推荐大家使用迅雷进行下载 下载安装包到自定义文件夹下 双击安装 按以下图示进行安装 2、 添加环境变量 打…

AI 欺诈事件频出,如何重塑身份认证的安全性?

据报告表示&#xff0c;生成式人工智能每年可为世界经济注入相当于 4.4 万亿美元的资金。预计到 2030 年&#xff0c;人工智能对全球财政的潜在贡献将达到 15.7 万亿美元。人们惊叹于 AI 强大工作效率&#xff0c;期待能帮忙节省不必要的劳动力&#xff0c;但事实上 AI 出现之后…

unity项目《样板间展示》开发:素材导入与整理

第一章&#xff1a;素材导入与整理 前言一、创建项目文件二、导入素材模型三、素材模型整理四、光源模型管理结语 前言 这次带大家从0到1做一个unity项目&#xff1a;《样板间展示》。 顾名思义&#xff0c;项目内容是展示样板间&#xff0c;即玩家可以与房间中的物体、家具进行…

【关于镜像】的几个常见问题解答

实例释放后&#xff0c;备份镜像是否还存在​ 您好&#xff0c;实例未释放前&#xff0c;通过控制台-》备份镜像后&#xff0c;备份镜像会存储在镜像管理中&#xff0c;这时候释放当前实例不影响已备份镜像&#xff0c;备份镜像还存在。 备份镜像过大&#xff0c;如何优化&am…

网络安全全栈培训笔记(55-服务攻防-数据库安全RedisHadoopMysqla未授权访问RCE)

第54天 服务攻防-数据库安全&Redis&Hadoop&Mysqla&未授权访问&RCE 知识点&#xff1a; 1、服务攻防数据库类型安全 2、Redis&Hadoop&Mysql安全 3、Mysql-CVE-2012-2122漏洞 4、Hadoop-配置不当未授权三重奏&RCE漏洞 3、Redis-配置不当未授权…

反汇编 - 相关工具和intel指令集结构介绍

目录 1.反汇编 2.反汇编相关工具介绍 3.Interl指令集结构 3.1 Instruction Prefixes&#xff1a;指令前缀 3.2 Opcode&#xff1a;指令操作码 3.3 Mode R/M&#xff1a;操作数类型 3.4 SIB&#xff1a;辅助Mode R/M&#xff0c;计算地址偏移 3.5 Displacement&#xff…

17.JVM-[一篇通]

文章目录 JVM1.JVM 简介 (一个进程有一个JVM)1.1JVM 发展史1.2 JVM 和《Java虚拟机规范》 2.JVM 运行流程2.1JVM 执行流程 3.JVM 运行时数据区3.1 堆&#xff08;线程共享 一个进程只有一份堆&#xff09;3.2Java虚拟机栈&#xff08;线程私有 每个线程都有一份属于自己的栈&am…

【机器学习300问】15、什么是逻辑回归模型?

一、逻辑回归模型是为了解决什么问题&#xff1f; 逻辑回归&#xff08;Logistic Regression&#xff09;是一种广义线性回归分析模型&#xff0c;尤其适用于解决二分类问题&#xff08;输出为两个类别&#xff09;。 &#xff08;1&#xff09;二分类举例 邮件过滤&#xff…