当涉及到消息发送和接收的可靠性,Spring Boot提供了一些机制来确保消息的可靠传递。其中包括消息确认机制和重试机制。下面是一个示例代码,演示如何在Spring Boot中实现可靠的消息发送和接收。
首先,我们需要配置RabbitMQ的连接信息和相关属性。在application.properties
文件中添加以下配置:
# RabbitMQ连接配置
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest# 消息重试配置
spring.rabbitmq.listener.simple.retry.enabled=true
spring.rabbitmq.listener.simple.retry.max-attempts=3
spring.rabbitmq.listener.simple.retry.initial-interval=5000
spring.rabbitmq.listener.simple.retry.multiplier=2
接下来,我们创建队列、一个消息发送者(Producer)和一个消息接收者(Consumer)的示例代码。
定义队列示列代码:
@Configuration
public class RabbitMQConfig {
@Bean
public Queue fanout_queue_email() {
return new Queue("fanout_queue_email");
}
}
Producer(消息发送者)示例代码:
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class MessageProducer {
private final RabbitTemplate rabbitTemplate;
@Autowired
public MessageProducer(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
}
public void sendMessage(String message) {
rabbitTemplate.convertAndSend("fanout_queue_sms", message);
System.out.println("消息发送成功:" + message);
}
}
在上面的代码中,我们注入了RabbitTemplate
对象,它是Spring Boot提供的用于发送消息的工具类。通过调用rabbitTemplate.convertAndSend()
方法发送消息到名为fanout_queue_sms
的队列
Consumer(消息接收者)示例代码:
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class MessageConsumer {
@RabbitListener(queues = "fanout_queue_sms")
public void receiveMessage(String message) {
System.out.println("接收到消息:" + message);
// 模拟处理消息的业务逻辑
processMessage(message);
}
private void processMessage(String message) {
// 模拟业务处理耗时
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 手动抛出异常,触发消息重试
if (message.contains("error")) {
throw new RuntimeException("处理消息失败");
}
System.out.println("消息处理成功:" + message);
}
}
在上面的代码中,我们使用@RabbitListener
注解标记receiveMessage()
方法,表示它是一个消息监听器,监听名为fanout_queue_sms
的队列。当接收到消息时,会触发receiveMessage()
方法进行处理。
在processMessage()
方法中,我们模拟了业务处理的耗时,并通过手动抛出异常的方式触发消息重试机制。如果消息处理失败,将会抛出运行时异常,触发Spring Boot的重试机制。
调用示例代码:
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ConfigurableApplicationContext;
@SpringBootApplication
public class MessageApplication {
public static void main(String[] args) {
// 启动Spring Boot应用
ConfigurableApplicationContext context = SpringApplication.run(MessageApplication.class, args);
// 获取MessageProducer Bean
MessageProducer producer = context.getBean(MessageProducer.class);
// 发送消息
producer.sendMessage("Hello, World!error");
}
}
运行效果如下:
本文由 mdnice 多平台发布