1.创建工程并引入依赖
<!-- 添加rocketmq的启动器--><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.1.1</version></dependency>
2.编写配置
rocketmq:name-server: 192.168.16.136:9876 #配置NameServer地址producer:group: "g1" #配置生产者组名称consumer:group: "g1" #配置消费者组名称
注:通过在配置文件中指定名称服务器和生产者组名等属性,RocketMQ的Spring Boot集成可以根据这些配置信息来创建和初始化RocketMQ的生产者。
这样就可以在应用程序中方便地使用RocketMQ发送消息到指定的名称服务器和生产者组。
3.编写一个监听器
@Component
//这是RocketMQ的消息监听器注解,它指定了要监听的消息主题(topic)和消费者组(consumerGroup)。${rocketmq.consumer.group}表示消费者组的值是从配置文件中获取的。
@RocketMQMessageListener(topic = "t1",consumerGroup ="${rocketmq.consumer.group}" )
public class ConsumerListener implements RocketMQListener<String> {
// 当消费者接收到消息时将会调用该方法。String message是接收到的消息内容。@Overridepublic void onMessage(String message) {System.out.println("消费者收到了生产者的消费:"+message+",已经消费!");}
}
3.编写控制器
@RestController
public class ProducerController {@ResourceRocketMQTemplate rocketMQTemplate;@GetMapping("/test")public String producerTest(){
// 1.发送一个字符串的消息,发送普通消息rocketMQTemplate.convertAndSend("t1","第一条消息");// 2.发送单向消息rocketMQTemplate.sendOneWay("t1","单项消息");// 3.发送同步消息rocketMQTemplate.syncSend("t1","同步消息");// 4.发送异步消息rocketMQTemplate.asyncSend("t1", "异步消息", new SendCallback() {
// 发送成功的时候会触发的方法@Overridepublic void onSuccess(SendResult sendResult) {System.out.println("------发送成功------"+sendResult);}
// 发送失败时会触发的方法@Overridepublic void onException(Throwable throwable) {System.out.println(throwable);}});// 5.发送顺序消息rocketMQTemplate.syncSendOrderly("t1","顺序消息","1");// 6.事务消息rocketMQTemplate.sendMessageInTransaction("t1", new Message<String>() {
// 获取内容@Overridepublic String getPayload() {return "这是发送的事务消息";}
// 消息头@Overridepublic MessageHeaders getHeaders() {return null;}},"2");return "ok";}
}
注:通过使用RocketMQTemplate,您可以在Spring Boot应用程序中轻松地与RocketMQ进行交互,实现消息的发送和接收。
RocketMQTemplate提供了以下一些常用的方法:
convertAndSend(topic, message):将消息发送到指定的主题。
convertAndSend(topic, key, message):将带有指定键的消息发送到指定的主题。
syncSend(topic, message):同步发送消息到指定的主题,并等待发送结果。
syncSendOrderly(topic, message, hashKey):有序地发送消息到指定的主题,保证同一个hashKey的消息发送到同一个消息队列。
asyncSend(topic, message, sendCallback):异步发送消息到指定的主题,并提供发送结果的回调函数。
sendOneWay(topic, message):单向发送消息到指定的主题,不关心发送结果。
测试结果1: