rabbitmq 基本总结

rabbitmq 的基本概念 vhost、broker、producer、 consumer、 exchange、 queue、 routing key

rabbitmq 常用的队列类型,工作队列(简单队列),pub/sub, routing key, topic 模式

<dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.16.0</version>
</dependency>
public class RabbitmqClientDemo {private static ConnectionFactory factory = new ConnectionFactory();private static String EXCHANGE_NAME = "exchange.fanout";private static String FANOUT_QUEUE = "queue.fanout";private static String DIRECT_EXCHANGE = "exchange_direct";private static String QUEUE_DIRCT = "queue.direct.02";private static String QUEUE_TOPIC_ONE = "queue.topic.01";private static String QUEUE_TOPIC_TWO = "queue.topic.02";private static String QUEUE_TOPIC_THREE = "queue.topic.03";private static String ROUNTING_KEY_ONE = "routing.key.01";private static String ROUNTING_KEY_TWO = "routing.key.02";private static String ROUNTING_KEY_THREE = "routing.key.03";private static String DEAD_MESSAGE_EXCHANGE = "EXCHANGE_DEAD";private static String DEAD_QUEUE = "queue.dead";static {factory.setHost("192.168.233.128");factory.setPort(5672);factory.setUsername("guest");factory.setPassword("guest");}public static Connection getConnection() throws IOException, TimeoutException {return factory.newConnection();}public static Channel createChannel() throws IOException, TimeoutException {Connection connection = getConnection();return connection.createChannel();}public static void main(String[] args) {//new WorkQueueProducer().start();//new WorkerConsumer().start();/*  new PublishConsumer().start();new PublishProducer().start();*///new TopicProducer().start();//new TopicConsumer().start();new DeadMessageProducer().start();new DeadMessageConsumer().start();}static class WorkQueueProducer extends Thread {@Overridepublic void run() {try {Connection connection = getConnection();Channel channel = connection.createChannel();channel = connection.createChannel();channel.queueDeclare("hello", false, false, false, null);channel.basicPublish("", "hello", null, "hello".getBytes());} catch (Exception e) {e.printStackTrace();} finally {try {//hannel.close();} catch (Exception e) {e.printStackTrace();}}}}static class WorkerConsumer extends Thread {@Overridepublic void run() {try {Connection connection = getConnection();Channel channel = connection.createChannel();channel.queueDeclare("hello", false, false, false, null);channel.basicQos(1);channel.basicConsume("hello", true, new DeliverCallback() {@Overridepublic void handle(String s, Delivery delivery) throws IOException {System.out.println(new String(delivery.getBody()));}}, consumerTag -> {});} catch (Exception e) {e.printStackTrace();} finally {try {//channel.close();} catch (Exception e) {e.printStackTrace();}}}}static class PublishProducer extends Thread {@Overridepublic void run() {try {Channel channel = createChannel();channel.exchangeDeclare(EXCHANGE_NAME, "fanout");channel.queueDeclare(FANOUT_QUEUE, true, false, false, null);channel.queueBind(FANOUT_QUEUE, EXCHANGE_NAME, "");for (int i = 1; i <= 40; i++) {String message = String.format("current orderId is %d, money is %d", UUID.randomUUID(), new Random().nextDouble());channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes(StandardCharsets.UTF_8));}} catch (Exception e) {e.printStackTrace();} finally {try {//channel.close();} catch (Exception e) {e.printStackTrace();}}}}static class PublishConsumer extends Thread {@Overridepublic void run() {try {Channel channel = createChannel();channel.exchangeDeclare(EXCHANGE_NAME, "fanout");channel.queueDeclare(FANOUT_QUEUE, true, false, false, null);channel.queueBind(FANOUT_QUEUE, EXCHANGE_NAME, "");DeliverCallback deliverCallback = (consumerTag, delivery) -> {System.out.println(new String(delivery.getBody(), StandardCharsets.UTF_8));};while (true) {channel.basicConsume(FANOUT_QUEUE, true, deliverCallback, consumerTag -> {});}} catch (Exception e) {e.printStackTrace();} finally {try {//channel.close();} catch (Exception e) {e.printStackTrace();}}}}static class TopicProducer extends Thread {@Overridepublic void run() {try {Channel channel = createChannel();channel.exchangeDeclare(DIRECT_EXCHANGE, "topic");channel.queueDeclare(QUEUE_TOPIC_ONE, true, false, false, null);channel.queueDeclare(QUEUE_TOPIC_TWO, true, false, false, null);channel.queueDeclare(QUEUE_TOPIC_THREE, true, false, false, null);channel.queueBind(QUEUE_TOPIC_ONE, DIRECT_EXCHANGE, ROUNTING_KEY_ONE);channel.queueBind(QUEUE_TOPIC_TWO, DIRECT_EXCHANGE, ROUNTING_KEY_TWO);channel.queueBind(QUEUE_TOPIC_TWO, DIRECT_EXCHANGE, ROUNTING_KEY_TWO);channel.queueBind(QUEUE_TOPIC_THREE, DIRECT_EXCHANGE, ROUNTING_KEY_THREE);channel.queueBind(QUEUE_TOPIC_THREE, DIRECT_EXCHANGE, ROUNTING_KEY_ONE);channel.queueBind(QUEUE_TOPIC_THREE, DIRECT_EXCHANGE, ROUNTING_KEY_TWO);for (int i = 1; i <= 10; i++) {String message = String.format("current orderId is %s, money is %s", UUID.randomUUID().toString(), new Random().nextDouble());if (i % 3 == 0) {System.out.println("send to topic1");channel.basicPublish(DIRECT_EXCHANGE, ROUNTING_KEY_ONE, null, message.getBytes(StandardCharsets.UTF_8));} else if (i % 3 == 1) {System.out.println("send to topic2");channel.basicPublish(DIRECT_EXCHANGE, ROUNTING_KEY_TWO, null, message.getBytes(StandardCharsets.UTF_8));} else {System.out.println("send to topic3");channel.basicPublish(DIRECT_EXCHANGE, ROUNTING_KEY_THREE, null, message.getBytes(StandardCharsets.UTF_8));}}} catch (Exception e) {e.printStackTrace();}}}static class TopicConsumer extends Thread {@Overridepublic void run() {try {Channel channel = createChannel();channel.exchangeDeclare(DIRECT_EXCHANGE, "topic");channel.queueDeclare(QUEUE_TOPIC_THREE, true, false, false, null);channel.queueBind(QUEUE_TOPIC_THREE,EXCHANGE_NAME,"routing.key.*")DeliverCallback deliverCallback = (consumerTag, delivery) -> {System.out.println(delivery.getEnvelope().getRoutingKey());System.out.println(new String(delivery.getBody(), StandardCharsets.UTF_8));};while (true) {channel.basicConsume(QUEUE_TOPIC_THREE, true, deliverCallback, consumerTag -> {});}} catch (Exception e) {e.printStackTrace();} finally {try {//channel.close();} catch (Exception e) {e.printStackTrace();}}}}static class DeadMessageProducer extends Thread {@Overridepublic void run() {try {Channel channel = createChannel();channel.exchangeDeclare(DIRECT_EXCHANGE, "direct");channel.queueDeclare(DEAD_QUEUE, true, false, false, null);channel.queueBind(DEAD_QUEUE, DEAD_MESSAGE_EXCHANGE, "routing.direct02");for (int i = 1; i <= 40; i++) {String message = String.format("current orderId is %s, money is %s", UUID.randomUUID().toString(), new Random().nextDouble());AMQP.BasicProperties prop = new AMQP.BasicProperties().builder().expiration("30000").build();channel.basicPublish(DIRECT_EXCHANGE, "routing.direct02", prop, message.getBytes(StandardCharsets.UTF_8));System.out.println("send to topic1");}} catch (Exception e) {e.printStackTrace();}}}static class DeadMessageConsumer extends Thread {@Overridepublic void run() {try {Channel channel = createChannel();/*channel.exchangeDeclare(DEAD_MESSAGE_EXCHANGE, "direct");Map<String, Object> deadLetterParams = new HashMap<>(2);deadLetterParams.put("x-dead-letter-exchange", DEAD_MESSAGE_EXCHANGE);deadLetterParams.put("x-dead-letter-routing-key", "routing.dead02");deadLetterParams.put("x-max-length", 2);*//*channel.queueDeclare(QUEUE_DIRCT, true, false, false, deadLetterParams);channel.queueBind(DEAD_QUEUE, DEAD_MESSAGE_EXCHANGE, "routing.dead02");*/channel.exchangeDeclare(DIRECT_EXCHANGE, "direct");channel.queueBind(QUEUE_DIRCT, DIRECT_EXCHANGE, "routing.direct02");DeliverCallback callback  = (consumerTag, delivery) -> {System.out.println(delivery.getEnvelope().getRoutingKey());System.out.println(new String(delivery.getBody(), StandardCharsets.UTF_8));};/* DeliverCallback callback = (consumerTag, delivery) -> {String receivedMessage = new String(delivery.getBody());System.out.println("C1接收到消息:" + receivedMessage + "并且拒绝签收了");// 禁止重新入队channel.basicReject(delivery.getEnvelope().getDeliveryTag(), false);};*/while (true) {//channel.basicConsume(QUEUE_DIRCT, true, deliverCallback, consumerTag -> {});channel.basicConsume(QUEUE_DIRCT, true, callback, (consumerTag) -> {System.out.println(consumerTag + "消费者取消消费消息");});}} catch (Exception e) {e.printStackTrace();} finally {try {//channel.close();} catch (Exception e) {e.printStackTrace();}}}}

整合springboot

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId><version>3.1.5</version>
</dependency>

rabbitmq 的核心配置(相比于其他的mq,rabbit 有图形用户界面,可以傻瓜操作)

https://blog.csdn.net/leesinbad/article/details/128670794

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/526574.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【C语言】tcp_transmit_skb

一、__tcp_transmit_skb讲解 这个函数 __tcp_transmit_skb() 是 Linux 内核中 TCP/IP 协议栈的一部分&#xff0c;负责处理传输控制协议&#xff08;TCP&#xff09;数据包的发送。具体来说&#xff0c;这个函数将 TCP 头部添加到一个没有任何头部信息的 socket buffer (sk_bu…

maven中dependencyManagement

如果所在pom中dependency引入的依赖没有指定版本号&#xff0c;会以pom中dependencyManagement所制定的版本号为准吗 是的&#xff0c;如果在项目的 <dependency> 元素中没有指定版本号&#xff0c;而且该依赖在 <dependencyManagement> 中有指定版本号&#xff0…

springboot255基于spring boot的疫情信息管理系统

疫情信息管理系统的设计与实现 摘要 近年来&#xff0c;信息化管理行业的不断兴起&#xff0c;使得人们的日常生活越来越离不开计算机和互联网技术。首先&#xff0c;根据收集到的用户需求分析&#xff0c;对设计系统有一个初步的认识与了解&#xff0c;确定疫情信息管理系统…

童装WP模板

童装WP模板 https://www.wpniu.com/moban/6359.html

【io.net空投】交互攻略

一、io.net是什么 Io.net 是一个基于 Solana 的DePIN项目&#xff0c;为人工智能 (AI) 和机器学习 (ML) 公司聚合 GPU 资源。 Io.net 的例子&#xff0c;就是鼓励大家出借 GPU 算力&#xff0c;为 AI 或机器学习&#xff08;ML&#xff09;公司提供更低价、更有效率的算力资源…

MySQL--explain执行计划详解

什么是执行计划&#xff1f; SQL的执行计划&#xff0c;通俗来说就是SQL的执行情况&#xff0c;一条SQL语句扫描哪些表&#xff0c;那个子查询先执行&#xff0c;是否用到了索引等等&#xff0c;只有当我们知道了这些情况之后才知道&#xff0c;才可以更好的去优化SQL&#xf…

一劳永逸的方法解决:LNK1168无法打开 xxx.exe 进行写入 报错问题

这种错误的产生原因&#xff1a; 运行程序退出不是按正常流退出&#xff0c;是按窗口右上角的 “X” 来关闭程序&#xff0c;但是后台的xxx.exe控制台程序还在运行&#xff1b;修改程序的代码后再运行&#xff0c;就会报LNK1168的错误&#xff1b; 报错示例&#xff1a; 解决方…

【Python】成功解决IndexError: list index out of range

【Python】成功解决IndexError: list index out of range &#x1f308; 个人主页&#xff1a;高斯小哥 &#x1f525; 高质量专栏&#xff1a;Matplotlib之旅&#xff1a;零基础精通数据可视化、Python基础【高质量合集】、PyTorch零基础入门教程&#x1f448; 希望得到您的订…

Linux系统架构----nginx上构建虚拟主机

Linux系统架构----nginx上构建虚拟主机 一、构建虚拟主机概述 利用虚拟主机&#xff0c;不用为每个运行的网站提供一台单独的Nginx服务器或单独运行一组Nginx进程&#xff0c;虚拟主机提供了在同一台服务器、同一组Nginx进程上运行的多个网站的功能与Apache相同&#xff0c;N…

【无标题】数据化转型是什么

这里写自定义目录标题 如何插入一段漂亮的代码片生成一个适合你的列表创建一个表格设定内容居中、居左、居右SmartyPants创建一个自定义列表如何创建一个注脚注释也是必不可少的KaTeX数学公式新的甘特图功能,丰富你的文章UML 图表FLowchart流程图导出与导入导出导入数据化转型…

求递归算法时间复杂性

递推方法 求n&#xff01;的递归算法&#xff1a; 该算法的时间复杂性&#xff1a; 递推过程&#xff1a; 主定理方法 要求&#xff1a;a>1,b>1 求解步骤&#xff1a; f(n)的渐进上界是以n的log以b为底的e次幂 判断关系后一定要满足这三个对应规则 例题&#xff1a;…

虚拟化之内存(Memory)

一 内存的查看方式 free -k/m/h cat /proc/meminfodmesg |grep memory free命令的实质是根据meminfo中的文件来提取信息 二 内存虚拟化 1.概念&#xff1a;由于物理MMU只能通过Host机的物理地址进行寻址&#xff0c;所以实现内存虚拟化&#xff0c;关键是需要将Guest机的…