添加rabbitmq依赖
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.amqp</groupId><artifactId>spring-rabbit-test</artifactId><scope>test</scope></dependency>
配置文件中加入rabbitmq配置
server:port: 18072
spring:rabbitmq:host: 127.0.0.1port: 5672username: guestpassword: guest
在启动类上添加注解
@EnableRabbit
@SpringBootApplication
public class RabbitMqBootApplication {public static void main(String[] args) {SpringApplication.run(RabbitMqBootApplication.class, args);}}
创建Rabbit配置类
@Configuration
public class RabbitConfig {//定义队列public static final String SIMPLE_QUEUE_NAME = "mqTest1";@Beanpublic Queue simpleQueue() {/**durable(): 是否持久化,当mq 重启之后还在exclusive(): 是否独占:只能有一个消费者监听这个队列,当Connection 关闭时,是否删除队列autoDelete(): 是否自动删除,当没有Consumer 监听时,自动删除withArgument(): 参数*/return QueueBuilder.durable(SIMPLE_QUEUE_NAME).build();}
}
创建消费者
@Service
@Slf4j
public class RabbitMqConsumer {@RabbitListener(queues = "mqTest1")public void receive(@Payload String message){log.info("收到了mqTest1队列消息:" + message);}
}
创建生产者
@Slf4j
@RestController
@RequestMapping("/mq")
public class ProducerController {@Autowiredprivate RabbitTemplate rabbitTemplate;@RequestMapping("/send")public String send(String message){rabbitTemplate.convertAndSend(RabbitConfig.SIMPLE_QUEUE_NAME,message);return "发送 " + message + " 到" + RabbitConfig.SIMPLE_QUEUE_NAME;}}
压测
使用for循环创建20个线程,每个线程向队列中插入一百万条数据
@RequestMapping("/strongSend")public String strongSend(){for (int i = 0; i < 20; i++) {new Thread(() -> {for (int i1 = 0; i1 < 1000000; i1++) {rabbitTemplate.convertAndSend(RabbitConfig.SIMPLE_QUEUE_NAME,Thread.currentThread().getName());}}).start();}return "压测完成";}
启动项目进行压测(记得把消费者关掉,或者消费者另启一个项目)
调用压测接口
进入rabbitmq管理页面查看
已经写入了96万数据,写入速度约每秒1.5万条,后面启动消费者进行消费即可