RabbitMQ 入门示例

参考:BV15k4y1k7Ep

RabbitMQ 相关概念及简述中简单介绍了 RabbitMQ 提供的 6 种工作模式。下面以简单模式为例,介绍 RabbitMQ 的使用。

新建工程

先新建 Maven 工程 RabbitMQ 作为父工程,在父工程下新建三个子模块:

  • common:公共包
  • producer:生产者
  • consumer:消费者

在三个模块中添加 amqp-client 依赖:

<dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.6.0</version>
</dependency>

在 producer 和 consumer 中添加 common 依赖:

<dependency><groupId>com.zhangmingge.rabbitmq</groupId><artifactId>common</artifactId><version>1.0-SNAPSHOT</version>
</dependency>

编写 common

在 common 中添加用于获取 connection 的工具类,后面 producer 和 consumer 都会用到:

package com.zhangmingge.rabbitmq;import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;public class ConnectionUtil {public static Connection getConnection() throws Exception {// 创建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();// 主机地址;默认为 localhostconnectionFactory.setHost("192.168.88.128");// 连接端口;默认为 5672connectionFactory.setPort(5672);// 虚拟主机名称;默认为 /connectionFactory.setVirtualHost("/vhost");// 连接用户名;默认为 guestconnectionFactory.setUsername("admin");// 连接密码;默认为 guestconnectionFactory.setPassword("123456");// 创建连接return connectionFactory.newConnection();}}

编写 producer

package com.zhangmingge.rabbitmq;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;public class Producer {static final String QUEUE_NAME = "simple_queue";public static void main(String[] args) throws Exception {// 创建连接Connection connection = ConnectionUtil.getConnection();// 创建频道Channel channel = connection.createChannel();// 声明(创建)队列/** 参数 1:队列名称* 参数 2:是否定义持久化队列* 参数 3:是否独占本次连接* 参数 4:是否在不使用的时候自动删除队列* 参数 5:队列其它参数*/channel.queueDeclare(QUEUE_NAME, true, false, false, null);// 要发送的信息String message = "你好;小兔子!";/** 参数 1:交换机名称,如果没有指定则使用默认 Default Exchange* 参数 2:路由 key,简单模式可以传递队列名称* 参数 3:消息其它属性* 参数 4:消息内容*/channel.basicPublish("", QUEUE_NAME, null, message.getBytes());System.out.println("已发送消息:" + message);// 关闭资源channel.close();connection.close();}
}

在执行上述的消息发送之后,登录 RabbitMQ 的管理控制台,可以发现队列和其中的消息:

image-20240825153428438
image-20240825153542014

编写 consumer

package com.zhangmingge.rabbitmq;import com.rabbitmq.client.*;import java.io.IOException;public class Consumer {public static void main(String[] args) throws Exception {Connection connection = ConnectionUtil.getConnection();// 创建频道Channel channel = connection.createChannel();// 声明(创建)队列/** 参数 1:队列名称* 参数 2:是否定义持久化队列* 参数 3:是否独占本次连接* 参数 4:是否在不使用的时候自动删除队列* 参数 5:队列其它参数*/channel.queueDeclare(Producer.QUEUE_NAME, true, false, false, null);// 创建消费者:并设置消息处理DefaultConsumer consumer = new DefaultConsumer(channel) {@Override/** consumerTag 消息者标签,在 channel.basicConsume 时候可以指定* envelope 消息包的内容,可从中获取消息 id,消息 routingKey,交换机,消息和重传标志 (收到消息失败后是否需要重新发送)* properties 属性信息* body 消息*/public void handleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body) throws IOException {// 路由 keySystem.out.println("路由 key 为:" + envelope.getRoutingKey());// 交换机System.out.println("交换机为:" + envelope.getExchange());// 消息 idSystem.out.println("消息 id 为:" + envelope.getDeliveryTag());// 收到的消息System.out.println("接收到的消息为:" + new String(body, "utf-8"));}};// 监听消息/** 参数 1:队列名称* 参数 2:是否自动确认,设置为 true 为表示消息接收到自动向 mq 回复接收到了,mq 接收到回复会删除消息,设置为 false 则需要手动确认* 参数 3:消息接收到后回调*/channel.basicConsume(Producer.QUEUE_NAME, true, consumer);// 不关闭资源,应该一直监听消息// channel.close();// connection.close();}
}

运行 consumer 后,可以看到 consumer 打印的日志消息,每运行一次 producer,consumer 就会对应打印一次消息。

小结

上述的入门案例中使用的是如下的简单模式:

1555991074575

在上图的模型中,有以下概念:

  • P:生产者,也就是要发送消息的程序。
  • C:消费者:消息的接受者,会一直等待消息到来。
  • queue:消息队列,图中红色部分。类似一个邮箱,可以缓存消息;生产者向其中投递消息,消费者从其中取出消息。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/787093.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Kubernetes大规模集群

Kubernetes资源限制 Kubernetes 单个集群支持的最大节点数为 5,000。Kubernetes标准的配置:每个节点的 Pod 数量不超过 110 节点数不超过 5,000 Pod 总数不超过 150,000 容器总数不超过 300,000你可以通过添加或删除节点来扩展集群。集群扩缩的方式取决于集群的部署方式 以下集…

WEB开发技术演变

什么是web开发 Web开发指的是网页系统开发,一说到网页,我想大概大部分人都会熟悉www,每次在浏览器中输入网址时,总会先输入www,这里其实是World Wide Web的简称,现在也简称Web, web技术发展 静态网页时代 1994年,网景公司(Netscape)发布了Navigator浏览器0.9版。这是…

go语言免杀-garble混淆

go语言免杀-garble混淆,禁止转载。题记 “愿先生心境,四季如春”作者回答:剑来之所以最大宗旨,是“我们不要轻易对这个世界失望”,因为道理太简单不过了,我们每个人在现实生活当中,太容易对人对事,产生大大小小的失望。而“愿先生心境四季如春”这句话,以后在书中会被…

delphi dxCameraControl控件(拍照)

拍照演示 DevExpressVCL 组件之一TdxCameraControl Object Hierarchy Properties Methods Events一个摄像头控件 Unit dxCameraControlSyntaxTdxCameraControl = class(TdxCustomCameraControl)Descrition该控件允许您捕捉视频或图像从内置/连接的网络摄像头或设备、前后…

sentinel-服务接入原理

通过sentinel前世今生介绍,我们知道了sentinel流控主要是依赖sentinel-core,但是我们生产环境往往需要动态更新流控规则所以需要集成nacos、zookeeper、redis、mysql、等中间存储。配置的复杂性和规则的复杂性我们需要可视化的方式对规则进行管理,我们需要集成dashboard。 这…

*2024.8.25 鲜花

没啥文采,写的不好。NTERNET OVERDOSE この混沌とした 令和のインターネットを照らす 一筋の光 電子の海を漂うオタクに笑顔を 未来の平和をお約束 躁鬱だけどまかせとけ インターネット・エンジェル ただいま降臨 社会をやめろ 家族をやめろ 人間関係をやめろ 今すぐ薄暗い部…

5分钟说透chatgpt

5分钟说清楚 ——到底它为啥能这么火? ——到底牛逼在哪? ——到底我能用来干嘛?把“他”想象成一个博览群书的人 想象一下,现在有一个知识非常渊博的一个人,博览群书,掌握了绝大多数的人类文本知识。(没错,chatgpt确实就是掌握了这么多,而且随着模型的增长,他会看更…

sentinel-前世今生

方便理解sentinel,假如我们自己要实现一套sentinel sentinel前世今生 方便理解sentinel,假如我们自己要实现一套sentinel 第一阶段 一心助手业务服务出现异常,通过监控大盘,发现超过自身服务能够承载的流量,导致请求出现大量排队,服务阻塞,进而导致其他依赖服务出现雪崩效应…

2024.8.25 鲜花

没啥文采,写的不好。NTERNET OVERDOSE この混沌とした 令和のインターネットを照らす 一筋の光 電子の海を漂うオタクに笑顔を 未来の平和をお約束 躁鬱だけどまかせとけ インターネット・エンジェル ただいま降臨 社会をやめろ 家族をやめろ 人間関係をやめろ 今すぐ薄暗い部…

zabbix-grafana配置

一、grafana 安装配置 安装grafana # yum install -y https://dl.grafana.com/oss/release/grafana-11.1.4-1.x86_64.rpm启动grafana # systemctl start grafana-servergrafana 在线安装 zabbix 插件,重启grafana服务 # grafana-cli plugins list-remote | grep -i zabbix id:…

Neo-GNNs: Neighborhood Overlap-aware Graph Neural Networks for Link Prediction

目录概符号说明MotivationNeo-GNN代码Neo-GNNs: Neighborhood overlap-aware graph neural networks for link prediction. NeurIPS, 2021.概 一种计算上相对高效的, 同时利用结构信息和特征信息的链接预测模型. 符号说明\(\mathcal{G} = (\mathcal{V}, \mathcal{E})\), graph;…

Thanos HA

ThanosHA组件HA组件Sidercar Receiver其它组件Querier Store Compactor RulerSidecar & Receiver工作方式Sidecar 为实现高可用,Sidecar组件与Prometheus运行在一个Pod中,双副本的Prometheus独立运行采集数据(scrape metrics),默认情况部署在Kubernetesk的Prometheus使…