RabbitMQ消息队列

简介

MQ(message queue),从字面意思上看就个 FIFO 先入先出的队列,只不过队列中存放的内容是 message 而已,它是一种具有接收数据、存储数据、发送数据等功能的技术服务。

作用:流量削峰、应用解耦、异步处理。

在这里插入图片描述
生产者将消息发送到消息队列中,消息队列负责转发消息给消费者,消费者在处理完消息后会对消息队列进行应答,消息队列收到应答信息会将相应的消息进行丢弃。

批量应答会导致高并发时消息的丢失,所以尽力以channel.ack()进行手动应答。

docker安装

  1. 拉取镜像并后台运行
docker run -id --name=rabbitmq -v rabbitmq-home:/var/lib/rabbitmq -p 15672:15672 -p 5672:5672 -e RABBITMQ_DEFAULT_USER=yi -e RABBITMQ_DEFAULT_PASS=123456 rabbitmq

需要将RABBITMQ_DEFAULT_USER、RABBITMQ_DEFAULT_PASS改成自己的用户名、密码。

  1. 开启manager插件,可以在网页进行管理。
 docker exec -it 容器id /bin/bash  #这里可以用docker ps 查询刚刚开启的容器id#进入容器后输入,开启rabbitmq-plugins enable rabbitmq_management

可以登录 http://服务器IP:15672 访问web管理界面,访问成功则代表开启成功。

JAVA环境搭建

jar包:

<dependencies><dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.8.0</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-simple</artifactId><version>1.7.25</version></dependency></dependencies>

Helloworld实例

生产者

public static void main(String[] args) throws IOException, TimeoutException {//创建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("127.0.0.1");connectionFactory.setUsername("yi");connectionFactory.setPassword("123456");//获取连接Connection connection = connectionFactory.newConnection();//获取信道,一个连接中有多个信道Channel channel = connection.createChannel();//声明一个队列 String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> argumentsAMQP.Queue.DeclareOk declareOk = channel.queueDeclare(QUEUE_NAME,false,false,false,null);String message="hello world";//(String exchange, String routingKey, AMQP.BasicProperties props, byte[] body)channel.basicPublish("",QUEUE_NAME,null,message.getBytes());System.out.println("发送成功");}

消费者

public static void main(String[] args) throws IOException, TimeoutException {ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("127.0.0.1");connectionFactory.setUsername("yi");connectionFactory.setPassword("123456");Connection connection = connectionFactory.newConnection();Channel channel = connection.createChannel();DeliverCallback deliverCallback=(consumerTag,message)->{System.out.println(new String(message.getBody()));};CancelCallback cancelCallback=(String var1)->{System.out.println("消息消费被中断");};channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);}

工作队列(任务队列)

RabbitMQ默认为工作队列模式,消费者C1,C2为竞争关系,接收到的消息将轮询发送给C1,C2处理,即C1一条C2一条依次循环。
在这里插入图片描述

手动应答ack

因为自动应答不会考虑消息是否处理成功,所以可能会导致消息丢失,需要在代码中将自动应答改为手动应答。批量应答在高并发的时候也容易丢失消息,也应该关闭。

生产者的代码无需修改。
消费者:

public static void main(String[] args) throws IOException, TimeoutException {Channel channel = RabbitMQUtils.getChannel();System.out.println("work2 waiting:");DeliverCallback deliverCallback= (String s, Delivery delivery)->{System.out.println(new String(delivery.getBody()));// do something//手动回复ack,false为关闭批量应答channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);};CancelCallback cancelCallback=(s)->{System.out.println("消息被打断");};//false表示不自动应答ackchannel.basicConsume(QUEUE_NAME,false,deliverCallback,cancelCallback);}

不公平分发

会存在有些线程能力差耗时长,有些能力强耗时短的情况,不公平分发将实现能者多劳。
设立channel的basicQos即可实现不公平分发, basicQos的数值意味着channel的最大存储上限,channel为1时,消费者最多同时缓存一条待处理消息。

channel.basicQos(1);

发布确认

在开启队列持久化、消息持久化后,RabbitMQ服务器仍然可能在将消息存储在磁盘前宕机,需要发布确认才能保证消息不丢失,即RabbitMQ在存储磁盘成功后,发送确认给生产者。

单个发布确认

每条消息存储在磁盘后进行发布确认,只有发送者在接收到消费者对应的发布确认消息后才会给此消费者发送下一条消息。

public static void publicMsgIndividual()throws Exception{Channel channel = RabbitMQUtils.getChannel();String QUEUE_NAME = UUID.randomUUID().toString();//开启持久化channel.queueDeclare(QUEUE_NAME,true,false,false,null);channel.confirmSelect();//开启发布确认long begin = System.currentTimeMillis();for (int i = 0; i < 1000; i++) {channel.basicPublish("",QUEUE_NAME,null, new String(i+" ").getBytes());       boolean flag = channel.waitForConfirms(); //等待发布确认if(flag){System.out.println("消息发送成功");}}long end = System.currentTimeMillis();System.out.println("发布1000条耗时:"+(end-begin)+"ms");}

批量发布确认

每发送100条消息进行一次发布确认。速度快,但是不知道具体是哪一条消息发送失败了。

public static void publicMsgIndividual()throws Exception{Channel channel = RabbitMQUtils.getChannel();String QUEUE_NAME = UUID.randomUUID().toString();//开启持久化channel.queueDeclare(QUEUE_NAME,true,false,false,null);channel.confirmSelect();//开启发布确认long begin = System.currentTimeMillis();for (int i = 0; i < 1000; i++) {channel.basicPublish("",QUEUE_NAME,null, new String(i+" ").getBytes());if(i%100==0){boolean flag = channel.waitForConfirms(); //等待发布确认if(flag){System.out.println("消息发送成功");}}}long end = System.currentTimeMillis();System.out.println("发布1000条耗时:"+(end-begin)+"ms");}

异步发布确认

推荐使用,需要加入确认发布监听器confirmListener,并且记录序列号与消息的关联(ConcurrentSkipListMap)。

 public static void publicMsgAsync()throws Exception{Channel channel = RabbitMQUtils.getChannel();String QUEUE_NAME = UUID.randomUUID().toString();//开启持久化channel.queueDeclare(QUEUE_NAME,true,false,false,null);channel.confirmSelect();//开启发布确认//       将序列号与信息相关联,ConcurrentSkipListMap concurrentSkipListMap = new ConcurrentSkipListMap<Long,String>();//加入确认监听器channel.addConfirmListener(new ConfirmListener() {@Overridepublic void handleAck(long msgTag, boolean multiply) throws IOException {System.out.println("消息发送成功:"+msgTag);if(multiply) { //如果是批量确认,批量删除//headMap返回小于msgTag的map视图ConcurrentNavigableMap concurrentNavigableMap = concurrentSkipListMap.headMap(msgTag);//清理已经标记的MapconcurrentNavigableMap.clear();}else {concurrentSkipListMap.remove(msgTag);}}@Overridepublic void handleNack(long msgTag, boolean multiply) throws IOException {System.out.println("未确认的消息:"+concurrentSkipListMap.get(msgTag));}});long begin = System.currentTimeMillis();for (int i = 0; i < 1000; i++) {channel.basicPublish("",QUEUE_NAME,null, new String(i+" ").getBytes());//记录发送的信息与其序列号concurrentSkipListMap.put(channel.getNextPublishSeqNo(),new String(i+" "));}long end = System.currentTimeMillis();System.out.println("发布1000条耗时:"+(end-begin)+"ms");}

发布/订阅模式

首先要弄明白交换机和队列的关系,交换机负责信息的接收,通过不同的RountingKey将消息转发到不同的队列,每个队列上的接收者都是竞争关系(即队列上的消息只会被处理一次),那么当一个交换机对应多个队列时,每个队列仅有一个消费者,这个时候即发布/订阅模式,消息会被每个消费者接收。
在这里插入图片描述

生产者代码:向交换机中发送消息

public static final String EXCHANGE_NAME="logs";public static void main(String[] args) throws IOException, TimeoutException {Channel channel = RabbitMQUtils.getChannel();Scanner scanner = new Scanner(System.in);while (scanner.hasNext()){String next = scanner.next();channel.basicPublish(EXCHANGE_NAME,"", null,next.getBytes());}}

消费者代码:声明匿名队列,将队列绑定到交换机上,不同的消费者用相同的RountingKey,以便同时接收到消息。

public static final String EXCHANGE_NAME="logs";public static void main(String[] args) throws IOException, TimeoutException {Channel channel = RabbitMQUtils.getChannel();channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT); //FANOUT煽出,就是发布订阅模式String queue = channel.queueDeclare().getQueue(); //声明匿名队列channel.queueBind(queue,EXCHANGE_NAME,""); //将队列绑定到交换机上,RountingKey为“”DeliverCallback deliverCallback=(consumerTag,message)->{System.out.println("接收到消息:"+new String(message.getBody()));};channel.basicConsume(queue,true,deliverCallback, (consumerTag)->{});}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/212559.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

浅谈Python中的鸭子类型和猴子补丁

文章目录 前言一、鸭子类型二、猴子补丁关于Python技术储备一、Python所有方向的学习路线二、Python基础学习视频三、精品Python学习书籍四、Python工具包项目源码合集①Python工具包②Python实战案例③Python小游戏源码五、面试资料六、Python兼职渠道 前言 Python 开发者可能…

使用vcpkg安装库失败的解决方法

1、前言 vcpk是是一款开源的c/c库管理工具&#xff0c;尤其是在windows平台&#xff0c;可以帮助我们很好的管理各种依赖包。 在windows环境做c/c开发的人应该都深有体会&#xff0c;有时候编译需要下载一堆依赖库&#xff0c;导致搭建编译环境特别麻烦。但是&#xff0c;通过v…

安全地公网访问树莓派等设备的服务 内网穿透--frp 23年11月方法

如果想要树莓派可以被公网访问&#xff0c;可以选择直接网上搜内网穿透提供商&#xff0c;一个月大概10块钱&#xff0c;也有免费的&#xff0c;但是免费的速度就不要希望很好了。 也可以选择接下来介绍的frp&#xff0c;这种方式不需要付费&#xff0c;但是需要你有一台有着公…

案例025:基于微信小程序的移动学习平台的设计与实现

文末获取源码 开发语言&#xff1a;Java 框架&#xff1a;SSM JDK版本&#xff1a;JDK1.8 数据库&#xff1a;mysql 5.7 开发软件&#xff1a;eclipse/myeclipse/idea Maven包&#xff1a;Maven3.5.4 小程序框架&#xff1a;uniapp 小程序开发软件&#xff1a;HBuilder X 小程序…

【Flink】状态管理

目录 1、状态概述 1.1 无状态算子 1.2 有状态算子 2、状态分类 ​编辑 2.1 算子状态 2.1.1 列表状态&#xff08;ListState&#xff09; 2.1.2 联合列表状态&#xff08;UnionListState&#xff09; 2.1.3 广播状态&#xff08;BroadcastState&#xff09; 2.2 按键分…

案例012:Java+SSM+uniapp基于微信小程序的科创微应用平台设计与实现

文末获取源码 开发语言&#xff1a;Java 框架&#xff1a;SSM JDK版本&#xff1a;JDK1.8 数据库&#xff1a;mysql 5.7 开发软件&#xff1a;eclipse/myeclipse/idea Maven包&#xff1a;Maven3.5.4 小程序框架&#xff1a;uniapp 小程序开发软件&#xff1a;HBuilder X 小程序…

oracle数据库巡检常见脚本-系列二

简介 作为数据库管理员&#xff08;DBA&#xff09;&#xff0c;定期进行数据库的日常巡检是非常重要的。以下是一些原因&#xff1a; 保证系统的稳定性&#xff1a;通过定期巡检&#xff0c;DBA可以发现并及时解决可能导致系统不稳定的问题&#xff0c;如性能瓶颈、资源利用率…

2023亚太杯数学建模C题思路 - 我国新能源电动汽车的发展趋势

1 赛题 问题C 我国新能源电动汽车的发展趋势 新能源汽车是指以先进技术原理、新技术、新结构的非常规汽车燃料为动力来源( 非常规汽车燃料指汽油、柴油以外的燃料&#xff09;&#xff0c;将先进技术进行汽车动力控制和驱动相结 合的汽车。新能源汽车主要包括四种类型&#x…

【libGDX】使用Mesh绘制立方体

1 前言 本文主要介绍使用 Mesh 绘制立方体&#xff0c;读者如果对 Mesh 不太熟悉&#xff0c;请回顾以下内容&#xff1a; 使用Mesh绘制三角形使用Mesh绘制矩形使用Mesh绘制圆形 在绘制立方体的过程中&#xff0c;主要用到了 MVP &#xff08;Model View Projection&#xff0…

Ubuntu20.04 install pnpm

npm install -g pnpm referrence link: Installation | pnpmPrerequisiteshttps://pnpm.io/installation

芯能科技-603105 三季报分析(20231123)

芯能科技-603105 基本情况 公司名称&#xff1a;浙江芯能光伏科技股份有限公司 A股简称&#xff1a;芯能科技 成立日期&#xff1a;2008-07-09 上市日期&#xff1a;2018-07-09 所属行业&#xff1a;电气机械和器材制造业 周期性&#xff1a;1 主营业务&#xff1a;分布式光伏解…

适用于电脑的5个免费文件恢复软件分享

适用于电脑的最佳免费文件恢复软件 任何计算机用户都可能经历过丢失重要文件的恐惧。重要数据的丢失可能会令人不安和沮丧&#xff0c;无论是由于不小心删除、计算机故障还是硬盘格式化造成的。幸运的是&#xff0c;在数字时代&#xff0c;您可以使用值得信赖的解决方案检索这些…