使用springBoot整合rabbitMQ需要事先导入相关依赖:
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.amqp</groupId><artifactId>spring-rabbit-test</artifactId><scope>test</scope></dependency>
生产者项目结构
绑定RabbitMQ的注册信息,yml文件:
#修改启动端口
server:port: 9001
#RabbitMQ 连接信息
spring:rabbitmq:username: adminpassword: adminvirtual-host: /host: 8.137.76.12port: 5672
Service包下的fanout业务类:
import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service;import java.util.UUID;@Service public class FanoutService {//注入rabbitMQ模板 @Autowiredprivate RabbitTemplate rabbitTemplate;//定义交换机的名字private String exchangeName = "fanout_order_exchange";//定义路由keyprivate String routingKey = "";//制造订单public void makeOrder(Long userID,Long productID,int num){String uuid = UUID.randomUUID().toString();System.out.println("用户:"+userID+",订单是:"+uuid);rabbitTemplate.convertAndSend(exchangeName,routingKey,uuid);} }
config包的FanoutRabbitConfig配置类,声明队列,声明交换机,绑定关系:
import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.FanoutExchange; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;@Configuration public class FanoutRabbitConfig {//注册rabbitMQ队列 @Beanpublic Queue qqQueue(){//队列的名称// durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效// exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable// autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。// return new Queue("TestDirectQueue",true,true,false);//一般设置一下队列的持久化就好,其余两个就是默认falsereturn new Queue("qqQueue",true);}@Beanpublic Queue WeChatQueue(){//一般设置一下队列的持久化就好,其余两个就是默认falsereturn new Queue("WeChatQueue",true);}@Beanpublic Queue smsQueue(){//一般设置一下队列的持久化就好,其余两个就是默认falsereturn new Queue("smsQueue",true);}//声明交换机 @Beanpublic FanoutExchange fanoutExchange(){return new FanoutExchange("fanout_order_exchange",true,false);}//绑定关系,将交换机和队列进行绑定 @Beanpublic Binding bindQue1(){return BindingBuilder.bind(qqQueue()).to(fanoutExchange());}@Beanpublic Binding bindQue2(){return BindingBuilder.bind(WeChatQueue()).to(fanoutExchange());}@Beanpublic Binding bindQue3(){return BindingBuilder.bind(smsQueue()).to(fanoutExchange());} }
单元测试类:
import org.cqust.rabbitmq.service.FanoutService; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest;@SpringBootTest class RabbitMqApplicationTests {@AutowiredFanoutService fanoutService;@Testvoid fanoutTest() throws InterruptedException {for (int i = 0; i < 5; i++) {Thread.sleep(1000);Long userId = 100L+ i;Long produceId = 10001L + i;int num = 10;fanoutService.makeOrder(userId,produceId,num);}} }
消费者项目结构
依旧需要填写连接信息:
server:port: 9002spring:rabbitmq:username: adminpassword: adminvirtual-host: /host: 8.137.76.12port: 5672
FanoutConsumer包下QQ消费者:
import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component;@Component @RabbitListener(queues = "qqQueue") // 监听名为qqQueue的队列 public class QQConsumer {@RabbitHandlerpublic void qqMsg(String message){System.out.println("qq ----->"+message); // 打印接收到的消息 } }
FanoutConsumer包下sms消费者:
import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component;@Component @RabbitListener(queues = "smsQueue") // 监听名为smsQueue的队列 public class smsConsumer {@RabbitHandlerpublic void qqMsg(String message){System.out.println("sms ----->"+message); // 打印接收到的消息 } }
FanoutConsumer包下wechat消费者:
import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component;@Component @RabbitListener(queues = "WeChatQueue") // 监听名为WeChatQueue的队列 public class WechatConsumer {@RabbitHandlerpublic void qqMsg(String message){System.out.println("wechat ----->"+message); // 打印接收到的消息 } }
测试
使用生产者的单元测试,声明队列交换机,并且发送信息:
启动消费者的springBoot容器,并且接收信息:
-------------