AMQ基础

一、消息中间件模式分部

queue一对一实现了负载均衡,将producer生产的消息发送到消息队列中,由多个消费者消费。但一个消息只能被一个消费者接受,当没有消费者可用时,这个消息会被保存直到有一个可用的消费者。

topic一对多实现了发布和订阅,当你发布一个消息,所有订阅这个topic的服务都能得到这个消息,所以从1到N个订阅者都能得到一个消息的拷贝。

详见 自己连接

二、 代码

  1. 添加activemq的依赖
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
  1. 配置 yml
spring.activemq.broker-url=tcp://172.16.154.27:61616
spring.activemq.user=admin
spring.activemq.password=123456
spring.activemq.in-memory=true
spring.activemq.pooled=false
  1. 工厂配置文件,整合amq
@Configuration
public class ActiveMQConfig {@Beanpublic JmsListenerContainerFactory<?> queueListenerFactory(@Qualifier("activeMQConnectionFactory") ActiveMQConnectionFactory connectionFactory) {DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();SimpleMessageListenerContainer container=new SimpleMessageListenerContainer();container.setConcurrentConsumers(3);container.setConnectionFactory(connectionFactory);factory.setPubSubDomain(false);factory.setConnectionFactory(connectionFactory);factory.setConcurrency("3-15");  //连接数factory.setRecoveryInterval(1000L); //重连间隔时间factory.setSessionAcknowledgeMode(4);return factory;}@Beanpublic ActiveMQConnectionFactory activeMQConnectionFactory(){ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();connectionFactory.setTrustAllPackages(true);connectionFactory.setRedeliveryPolicy(redeliveryPolicy());return connectionFactory;}@Beanpublic RedeliveryPolicy redeliveryPolicy(){RedeliveryPolicy redeliveryPolicy=new RedeliveryPolicy();//是否在每次尝试重新发送失败后,增长这个等待时间redeliveryPolicy.setUseExponentialBackOff(true);//重发次数,默认为6次redeliveryPolicy.setMaximumRedeliveries(5);//重发时间间隔,默认为1秒redeliveryPolicy.setInitialRedeliveryDelay(1);//第一次失败后重新发送之前等待500毫秒,第二次失败再等待500 * 2毫秒,这里的2就是valueredeliveryPolicy.setBackOffMultiplier(2);//是否避免消息碰撞redeliveryPolicy.setUseCollisionAvoidance(false);//设置重发最大拖延时间-1 表示没有拖延只有UseExponentialBackOff(true)为true时生效redeliveryPolicy.setMaximumRedeliveryDelay(-1);return redeliveryPolicy;}@Beanpublic JmsTemplate jmsTemplate(ActiveMQConnectionFactory activeMQConnectionFactory){JmsTemplate jmsTemplate=new JmsTemplate();jmsTemplate.setDeliveryMode(1);//进行持久化配置 1表示非持久化,2表示持久化jmsTemplate.setConnectionFactory(activeMQConnectionFactory);jmsTemplate.setSessionAcknowledgeMode(4);//客户端签收模式return jmsTemplate;}}
  1. 定义 queue。(也可以topic)
@Configuration
public class QueueConfig {@Bean(name="telQueue")public Queue telQueue() {return new ActiveMQQueue(MESSAGE_TEL);}@Bean(name = "emailQueue")public Queue emailQueue() {return new ActiveMQQueue(MESSAGE_EMAIL);}}
  1. 定义生产者----使用jmsTemplate
@Slf4j
@Component
public class QueueSender {@Autowiredprivate JmsMessagingTemplate jmsMessagingTemplate;@Resource(name = "telQueue")private Queue telQueue;@Resource(name = "emailQueue")private Queue emailQueue;public void sendTel(final MessageTel message){log.info("发送短信message={}",message);this.jmsMessagingTemplate.convertAndSend(telQueue,message);}public void sendEmail(final MessageEMail message){log.info("发送email message={}",message);this.jmsMessagingTemplate.convertAndSend(emailQueue,message);}}
  1. 定义消费者—使用@JmsListener。@JmsListener可指定多个destination 监听多个队列
@Slf4j
@Component
public class QueueReceiver {@Autowiredprivate MailUtil mailUtil;@Autowiredprivate SendSMSUtil sendSMSUtil;/*** 发送短信** @param messageTel*/@JmsListener(destination = QueueConfig.MESSAGE_TEL, containerFactory = "queueListenerFactory")public void receiveTel1(MessageTel messageTel) {log.info("-----receive tel message-----"); }/*** 发送邮件** @param messageEMail*/@JmsListener(destination = QueueConfig.MESSAGE_EMAIL, containerFactory = "queueListenerFactory")// 可监听多个队列// @JmsListeners(value = {@JmsListener(destination = "T1"), @JmsListener(destination = "T2")})public void receiveEmail(MessageEMail messageEMail) {log.info("-----receive email message-----");}
}

消费消息有2种方法,
  一种是调用consumer.receive()方法,该方法将阻塞直到获得并返回一条消息。这种情况下,消息返回给方法调用者之后就自动被确认了。
  另一种方法是采用listener回调函数,在有消息到达时,会调用listener接口的onMessage方法。在这种情况下,在onMessage方法执行完毕后,消息才会被确认,此时只要在方法中抛出异常,该消息就不会被确认。

三、JMSTemplate

官方API链接在此,恶灵退散~
常用对比。注意send和convertAndSend
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/230711.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

优化邮件群发效果的方法与策略

怎样优化邮件群发效果&#xff1f;这是许多企业在进行邮件营销时常常被问到的问题。邮件营销是一种高效且经济实惠的市场推广方式&#xff0c;但如何使邮件真正引起接收者的兴趣并产生预期的效果并不容易。好的营销效果可以带来高回报、高收益率&#xff0c;但是怎么提升群发效…

Chrome显示分享按钮

分享按钮不见了&#xff01; Chrome://flags Chrome Refresh 2023 Disabled 左上角的标签搜索会到右上角。

文件重命名:如何删除文件名中的下划线,特殊符号批量删除

在日常的工作中&#xff0c;经常会遇到文件名中包含特殊符号的情况&#xff0c;例如&#xff0c;一些文件名可能包含下划线、空格或其他特殊符号&#xff0c;这些符号可能会干扰我们的文件搜索和识别。此外&#xff0c;一些文件名可能包含无法识别的非标准字符&#xff0c;这可…

RT-DETR算法优化改进:AKConv(可改变核卷积),即插即用的卷积,效果秒杀DSConv | 2023年11月最新发表

💡💡💡本文全网首发独家改进:可改变核卷积(AKConv),赋予卷积核任意数量的参数和任意采样形状,为网络开销和性能之间的权衡提供更丰富的选择,解决具有固定样本形状和正方形的卷积核不能很好地适应不断变化的目标的问题点,效果秒殺DSConv 1)AKConv替代标准卷积进行…

纵行科技获评“汽车物流行业优秀技术装备供应商”

近日&#xff0c;由中国物流与采购联合会主办&#xff0c;中物联汽车物流分会承办的“2023年全国汽车物流行业年会”在湖北十堰盛大召开。本次年会集合了汽车整车、零部件、售后备件、进出口物流企业和物流装备技术企业、科研机构及院校等&#xff0c;分享汽车物流行业现状、相…

ArkTS-ohpm控制台无法使用

ohpm : 无法将“ohpm”项识别为 cmdlet、函数、脚本文件或可运行程序的名称。请检查名称的拼写&#xff0c;如果包括路径&#xff0c;请确保路径正确&#xff0c;然后再试一次。 所在位置 行:1 字符: 1 ohpm install ohos/axiosCategoryInfo : ObjectNotFound: (ohpm:String) […

C语言进阶指南(16)(自定义数据类型——结构体)

欢迎来到博主的专栏——C语言进阶指南 博主id&#xff1a;reverie.ly 文章目录 结构体类型结构体类型的声明结构体变量的声明 结构体变量的初始化结构体变量结构体变量的赋值结构体变量的成员结构体变量的使用结构体变量的内存存储 前面使用的变量都是简单类型的变量&#xff0…

wvp 视频监控平台抓包分析

抓包时机 下面的抓包时机是抓包文件最新&#xff0c;但是最有用的包 选择网卡开始抓包 如果之前已经选择网卡&#xff0c;直接开始抓包 停止抓包 重新抓包 sip播放过程分析 过滤条件 tcp.port 5060 and sip 可以看到有这些包 选择任何一个 &#xff0c;戍边右键--追踪流--…

操作系统 day14(进程同步、进程互斥、互斥的代码实现、互斥的硬件实现、互斥锁)

进程同步 概念 进程的异步性体现在&#xff0c;例如&#xff1a;当有I/O操作时&#xff0c;进程需要等待I/O操作&#xff0c;而每个I/O操作又是不同的&#xff0c;所以进程没有一个固定的顺序&#xff0c;固定的时间来执行&#xff0c;而这体现了进程的异步性。 进程互斥 …

【LeetCode:907. 子数组的最小值之和 | 贡献法 乘法原理 单调栈】

&#x1f680; 算法题 &#x1f680; &#x1f332; 算法刷题专栏 | 面试必备算法 | 面试高频算法 &#x1f340; &#x1f332; 越难的东西,越要努力坚持&#xff0c;因为它具有很高的价值&#xff0c;算法就是这样✨ &#x1f332; 作者简介&#xff1a;硕风和炜&#xff0c;…

光伏、储能双层优化配置接入配电网研究(附带Matlab代码)

由于能源的日益匮乏&#xff0c;电力需求的不断增长等&#xff0c;配电网中分布式能源渗透率不断提高&#xff0c;且逐渐向主动配电网方向发展。此外&#xff0c;需求响应(demand response&#xff0c;DR)的加入对配电网的规划运行也带来了新的因素。因此&#xff0c;如何综合考…

CodeMeter软件保护及授权管理解决方案(二)

客户端管理工具 CodeMeter Runtime是CodeMeter解决方案中的重要组成部分&#xff0c;其为独立软件包&#xff0c;开发者需要把CodeMeter Runtime和加密后的软件一起发布。CodeMeter Runtim包括以下组件用于实现授权的使用&#xff1a; CodeMeter License Server授权服务器 Co…