RabbitMQ - 简单案例

目录

0.引用

1.Hello world

2.轮训分发消息

  2.1 抽取工具类

  2.2 启动两个工作线程接受消息

  2.4 结果展示

3.消息应答

  3.1 自动应答

  3.2 手动消息应答的方法

   3.3 消息自动重新入队

  3.4 消息手动应答代码

4.RabbitMQ 持久化

  4.1 队列如何实现持久化

  4.2 消息实现持久化

 5.不公平分发

6.预取值分发


0.引用

https://note.oddfar.com/rabbitmq/

1.Hello world

  1.1 依赖引用

<dependencies><!--rabbitmq 依赖客户端--><dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.8.0</version></dependency><!--操作文件流的一个依赖--><dependency><groupId>commons-io</groupId><artifactId>commons-io</artifactId><version>2.6</version></dependency>
</dependencies>

  1.2 消息生产者

package com.example.one;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;public class Producer {private final static String QUEUE_NAME = "quque";public static void main(String[] args) throws Exception {//创建一个连接工厂ConnectionFactory factory = new ConnectionFactory();factory.setHost("192.168.2.17");factory.setUsername("admin");factory.setPassword("admin");//channel 实现了自动 close 接口 自动关闭 不需要显示关闭//创建连接Connection connection = factory.newConnection();//获取信道Channel channel = connection.createChannel();/*** 生成一个队列* 1.QUEUE_NAME 队列名称* 2.durable 队列里面的消息是否持久化 也就是是否用完就删除* 3.exclusive 该队列是否只供一个消费者进行消费 是否进行共享 true 可以多个消费者消费* 4.autoDelete是否自动删除 最后一个消费者端开连接以后 该队列是否自动删除 true 自动删除* 5.其他参数*/Boolean durable = true;Boolean exclusive = false;Boolean autoDelete = false;Map<String, Object> arguments = null;channel.queueDeclare(QUEUE_NAME,durable,exclusive,autoDelete, null);String message = "hello world";/*** 发送一个消息* 1.发送到那个交换机* 2.路由的 key 是哪个* 3.其他的参数信息* 4.发送消息的消息体*/channel.basicPublish("", QUEUE_NAME, null, message.getBytes());System.out.println("消息发送完毕");}}

  1.3 消息消费者

package com.example.one;
import com.rabbitmq.client.*;public class Consumer {private final static String QUEUE_NAME = "quque";public static void main(String[] args) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("192.168.2.17");factory.setUsername("admin");factory.setPassword("admin");Connection connection = factory.newConnection();Channel channel = connection.createChannel();System.out.println("等待接收消息.........");//推送的消息如何进行消费的接口回调DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody());System.out.println(message);};//取消消费的一个回调接口 如在消费的时候队列被删除掉了CancelCallback cancelCallback = (consumerTag) -> {System.out.println("消息消费被中断");};/*** 消费者消费消息 - 接受消息* 1.消费哪个队列* 2.消费成功之后是否要自动应答 true 代表自动应答 false 手动应答* 3.消费者未成功消费的回调* 4.消息被取消时的回调*/channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);}}

2.轮训分发消息

  2.1 抽取工具类

package com.example.utils;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;public class RabbitMqUtils {//得到一个连接的 channelpublic static Channel getChannel() throws Exception {//创建一个连接工厂ConnectionFactory factory = new ConnectionFactory();factory.setHost("192.168.2.17");factory.setUsername("admin");factory.setPassword("admin");Connection connection = factory.newConnection();Channel channel = connection.createChannel();return channel;}
}

  2.2 启动两个工作线程接受消息

package com.example.two;import com.oddfar.utils.RabbitMqUtils;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;public class Worker01 {private static final String QUEUE_NAME = "quque";public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtils.getChannel();//消息接受DeliverCallback deliverCallback = (consumerTag, delivery) -> {String receivedMessage = new String(delivery.getBody());System.out.println("接收到消息:" + receivedMessage);};//消息被取消CancelCallback cancelCallback = (consumerTag) -> {System.out.println(consumerTag + "消费者取消消费接口回调逻辑");};System.out.println("C1 消费者启动等待消费.................. ");channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);}
}

选中 Allow multiple instances

image-20210627125840217

 启动后

image-20210627130146584

   2.3 启动一个发送消息线程

public class Task01 {public static final String QUEUE_NAME = "quque";public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtils.getChannel();Scanner scanner = new Scanner(System.in);while (scanner.hasNext()) {String message = scanner.next();channel.basicPublish("", QUEUE_NAME, null, message.getBytes());System.out.println("消息发送完成:" + message);}}
}

  2.4 结果展示

        通过程序执行发现生产者总共发送 4 个消息,消费者 1 和消费者 2 分别分得两个消息,并且是按照有序的一个接收一次消息

3.消息应答

  3.1 自动应答

        消息发送后立即被认为已经传送成功

  3.2 手动消息应答的方法

  • Channel.basicAck(用于肯定确认)
  • Channel.basicNack(用于否定确认)
  • Channel.basicReject(用于否定确认)

Multiple 的解释:

        手动应答的好处是可以批量应答并且减少网络拥堵

  •  true 代表批量应答 channel 上未应答的消息
  • false 同上面相比只会应答 tag=8 的消息 5,6,7 这三个消息依然不会被确认收到消息应答

RabbitMQ-00000018

   3.3 消息自动重新入队

  3.4 消息手动应答代码

        消费者在上面代码的基础上增加了以下内容

channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);

4.RabbitMQ 持久化

  4.1 队列如何实现持久化

//让队列持久化
boolean durable = true;
//声明队列
channel.queueDeclare(TASK_QUEUE_NAME, durable, false, false, null);

  4.2 消息实现持久化

        需要在消息生产者修改代码,MessageProperties.PERSISTENT_TEXT_PLAIN 添加这个属性

RabbitMQ-00000028

 5.不公平分发

  为了避免这种情况,在消费者中消费之前,我们可以设置参数 channel.basicQos(1);

//不公平分发
int prefetchCount = 1;
channel.basicQos(prefetchCount);//采用手动应答
boolean autoAck = false;
channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, cancelCallback);

6.预取值分发

        本身消息的发送就是异步发送的,所以在任何时候,channel 上肯定不止只有一个消息另外来自消费 者的手动确认本质上也是异步的。因此这里就存在一个未确认的消息缓冲区,因此希望开发人员能限制此缓冲区的大小以避免缓冲区里面无限制的未确认消息问题。这个时候就可以通过使用 basic.qos 方法设 置“预取计数”值来完成的

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/58848.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

培训报名小程序报名确认开发

目录 1 创建页面2 创建URL参数3 信息展示4 消息订阅5 页面传参6 程序预览总结 我们上一篇介绍了报名功能的开发&#xff0c;在用户报名成功后需要展示报名的确认信息&#xff0c;如果信息无误提示用户支付&#xff0c;在支付之前需要让用户进行授权&#xff0c;允许小程序给用户…

Spring 知识点

Spring 1.1 Spring 简介 1.1.1 Spring 概念 Spring是一个轻量级Java开发框架&#xff0c;最早有Rod Johnson创建为了解决企业级应用开发的业务逻辑层和其他各层的耦合问题Spring最根本的使命是解决企业级应用开发的复杂性&#xff0c;即简化Java开发。使现有的技术更加容易使…

从8个新 NFT AMM,聊聊能如何为 NFT 提供流动性

DeFi 的出现&#xff0c;开启了数字金融民主化的革命。其中&#xff0c;通过 AMM 自由创建流动性池极大地增加了 ERC-20 Token 的流动性&#xff0c;并为一些长尾 Token 解锁了价值的发现&#xff0c;因而今天在链上可以看到各种丰富的交易、借贷和杠杆等活动。 而另一方面&am…

C语言一些有趣的冷门知识

文章目录 概要1.访问数组元素的方法运行结果 2.中括号的特殊用法运行结果 3.大括号的特殊用法运行结果 4.sizeof的用法运行结果 5.渐进运算符运行结果 小结 概要 本文章只是介绍一些有趣的C语言知识&#xff0c;纯属娱乐。这里所有的演示代码我是使用的编译器是Visual Studio …

linux基于信号量实现多线程生产者消费者模型

基于信号量实现多线程生产者消费者模型。 编程思路&#xff1a; 1.食物的初始化编号为100&#xff1a; beginnum 100&#xff1b; 2.仓库有5个空碗&#xff0c;最多保存5个食物&#xff1a;queue[5]&#xff1b; 3.初始化空碗的数量为5&#xff0c;食物的数量为0&#xff1a…

Go context.WithCancel()的使用

WithCancel可以将一个Context包装为cancelCtx,并提供一个取消函数,调用这个取消函数,可以Cancel对应的Context Go语言context包-cancelCtx 疑问 context.WithCancel()取消机制的理解 父母5s钟后出门&#xff0c;倒计时&#xff0c;父母在时要学习&#xff0c;父母一走就可以玩 …

python之prettytable库的使用

文章目录 一 什么是prettytable二 prettytable的简单使用1. 添加表头2. 添加行3. 添加列4. 设置对齐方式4. 设置输出表格样式5. 自定义边框样式6. 其它功能 三 prettytable在实际中的使用 一 什么是prettytable prettytable是Python的一个第三方工具库&#xff0c;用于创建漂亮…

爬虫来介绍ChromeF12 谷歌开发者工具 -Network

了解网页基础(HTML、CSS、JavaScript) 了解HTTP基本原理 了解JSON格式 了解Ajax请求 了解爬虫基本原理 (一)、Chrome开发者工具面板概述 Elements 查找网页源代码HTML中的任一元素,手动修改任一元素的属性和样式且能实时在浏览器里面得到反馈。 比如我们在Event Listener…

springboot vue 初步集成onlyoffice

文章目录 前言一、vue ts1. 安装依赖2. onlyoffice组件实现&#xff08;待优化&#xff09;3. 使用组件4. 我的配置文件 二、springboot 回调代码1. 本地存储 三、效果展示踩坑总结问题1问题2 前言 对接onlyoffice&#xff0c;实现文档的预览和在线编辑功能。 一、vue ts …

Android数据存储选项:SQLite、Room等

Android数据存储选项&#xff1a;SQLite、Room等 1. 引言 在移动应用的开发过程中&#xff0c;数据存储是至关重要的一环。无论是用户的个人信息、设置配置还是应用产生的临时数据&#xff0c;都需要在设备上进行存储以便随时访问。随着移动应用的日益发展&#xff0c;数据存…

Openlayers实战:判断共享单车是否在电子围栏内

共享单车方便了我们的日常生活,解决了后一公里的行程问题。为了解决共享单车乱放的问题,运营部门规划出一些围栏,配合到电子地图上即为电子围栏,只有放在围栏内才能停车结算,在我们的Openlayers实战示例中,即模拟这一场景。 效果图 源代码 /* * @Author: 大剑师兰特(x…

开源数据集分类汇总(医学,卫星,分割,分类,人脸,农业,姿势等)

本文汇总了医学图像、卫星图像、语义分割、自动驾驶、图像分类、人脸、农业、打架识别等多个方向的数据集资源&#xff0c;均附有下载链接。 该文章仅用于学习记录&#xff0c;禁止商业使用&#xff01; 1.医学图像 疟疾细胞图像数据集 下载链接&#xff1a;http://suo.nz/2V…