一、消息中间件模式分部
queue 一对一。实现了负载均衡,将producer生产的消息发送到消息队列中,由多个消费者消费。但一个消息只能被一个消费者接受,当没有消费者可用时,这个消息会被保存直到有一个可用的消费者。
topic 一对多。实现了发布和订阅,当你发布一个消息,所有订阅这个topic的服务都能得到这个消息,所以从1到N个订阅者都能得到一个消息的拷贝。
详见 自己连接
二、 代码
- 添加activemq的依赖
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
- 配置 yml
spring.activemq.broker-url=tcp://172.16.154.27:61616
spring.activemq.user=admin
spring.activemq.password=123456
spring.activemq.in-memory=true
spring.activemq.pooled=false
- 工厂配置文件,整合amq
@Configuration
public class ActiveMQConfig {@Beanpublic JmsListenerContainerFactory<?> queueListenerFactory(@Qualifier("activeMQConnectionFactory") ActiveMQConnectionFactory connectionFactory) {DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();SimpleMessageListenerContainer container=new SimpleMessageListenerContainer();container.setConcurrentConsumers(3);container.setConnectionFactory(connectionFactory);factory.setPubSubDomain(false);factory.setConnectionFactory(connectionFactory);factory.setConcurrency("3-15"); //连接数factory.setRecoveryInterval(1000L); //重连间隔时间factory.setSessionAcknowledgeMode(4);return factory;}@Beanpublic ActiveMQConnectionFactory activeMQConnectionFactory(){ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();connectionFactory.setTrustAllPackages(true);connectionFactory.setRedeliveryPolicy(redeliveryPolicy());return connectionFactory;}@Beanpublic RedeliveryPolicy redeliveryPolicy(){RedeliveryPolicy redeliveryPolicy=new RedeliveryPolicy();//是否在每次尝试重新发送失败后,增长这个等待时间redeliveryPolicy.setUseExponentialBackOff(true);//重发次数,默认为6次redeliveryPolicy.setMaximumRedeliveries(5);//重发时间间隔,默认为1秒redeliveryPolicy.setInitialRedeliveryDelay(1);//第一次失败后重新发送之前等待500毫秒,第二次失败再等待500 * 2毫秒,这里的2就是valueredeliveryPolicy.setBackOffMultiplier(2);//是否避免消息碰撞redeliveryPolicy.setUseCollisionAvoidance(false);//设置重发最大拖延时间-1 表示没有拖延只有UseExponentialBackOff(true)为true时生效redeliveryPolicy.setMaximumRedeliveryDelay(-1);return redeliveryPolicy;}@Beanpublic JmsTemplate jmsTemplate(ActiveMQConnectionFactory activeMQConnectionFactory){JmsTemplate jmsTemplate=new JmsTemplate();jmsTemplate.setDeliveryMode(1);//进行持久化配置 1表示非持久化,2表示持久化jmsTemplate.setConnectionFactory(activeMQConnectionFactory);jmsTemplate.setSessionAcknowledgeMode(4);//客户端签收模式return jmsTemplate;}}
- 定义 queue。(也可以topic)
@Configuration
public class QueueConfig {@Bean(name="telQueue")public Queue telQueue() {return new ActiveMQQueue(MESSAGE_TEL);}@Bean(name = "emailQueue")public Queue emailQueue() {return new ActiveMQQueue(MESSAGE_EMAIL);}}
- 定义生产者----使用jmsTemplate
@Slf4j
@Component
public class QueueSender {@Autowiredprivate JmsMessagingTemplate jmsMessagingTemplate;@Resource(name = "telQueue")private Queue telQueue;@Resource(name = "emailQueue")private Queue emailQueue;public void sendTel(final MessageTel message){log.info("发送短信message={}",message);this.jmsMessagingTemplate.convertAndSend(telQueue,message);}public void sendEmail(final MessageEMail message){log.info("发送email message={}",message);this.jmsMessagingTemplate.convertAndSend(emailQueue,message);}}
- 定义消费者—使用@JmsListener。@JmsListener可指定多个destination 监听多个队列
@Slf4j
@Component
public class QueueReceiver {@Autowiredprivate MailUtil mailUtil;@Autowiredprivate SendSMSUtil sendSMSUtil;/*** 发送短信** @param messageTel*/@JmsListener(destination = QueueConfig.MESSAGE_TEL, containerFactory = "queueListenerFactory")public void receiveTel1(MessageTel messageTel) {log.info("-----receive tel message-----"); }/*** 发送邮件** @param messageEMail*/@JmsListener(destination = QueueConfig.MESSAGE_EMAIL, containerFactory = "queueListenerFactory")// 可监听多个队列// @JmsListeners(value = {@JmsListener(destination = "T1"), @JmsListener(destination = "T2")})public void receiveEmail(MessageEMail messageEMail) {log.info("-----receive email message-----");}
}
消费消息有2种方法,
一种是调用consumer.receive()
方法,该方法将阻塞直到获得并返回一条消息。这种情况下,消息返回给方法调用者之后就自动被确认了。
另一种方法是采用listener回调函数,在有消息到达时,会调用listener接口的onMessage
方法。在这种情况下,在onMessage方法执行完毕后,消息才会被确认,此时只要在方法中抛出异常,该消息就不会被确认。
三、JMSTemplate
官方API链接在此,恶灵退散~
常用对比。注意send和convertAndSend