Spring源码系列文章
RocketMQ(一):基本概念和环境搭建
RocketMQ(二):基础API
目录
- 一、RocketMQ快速入门
- 1、生产者发送消息
- 2、消费者接受消息
- 3、代理者位点和消费者位点
- 二、消费模型特点
- 1、同一个消费组的不同消费者,订阅主题必须相同
- 2、不同消费者组订阅同一主题,都会收到一份消息
- 3、消费者组内负载均衡模式,消费者固定队列接收消息
- 4、消费模式
- 三、发送不同类型消息
- 1、发送同步消息
- 2、发送异步消息
- 3、发送单向消息
- 4、发送延迟消息
- 5、发送批量消息
- 6、发送顺序消息
- 7、发送带标签的消息
- 8、发送带key的消息
一、RocketMQ快速入门
pom.xml
<!-- 原生的api -->
<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId><version>4.9.2</version>
</dependency>
1、生产者发送消息
@Test
public void simpleProducer() throws Exception {// 创建一个生产者 (制定一个组名)DefaultMQProducer producer = new DefaultMQProducer("test-producer-group");// 连接namesrvproducer.setNamesrvAddr("localhost:9876");// 启动producer.start();// 创建一个消息Message message = new Message("testTopic", "我是一个简单的消息".getBytes());// 发送消息SendResult sendResult = producer.send(message);// 这是个同步消息,所以这里可以拿到消息的消费状态System.out.println(sendResult.getSendStatus());// 关闭生产者producer.shutdown();
}
- dashboard客户端界面,查看主题界面可以看到刚刚创建的testTopic主题
- 如下状态按钮进入的页面,默认四个队列,目前队列1有一条消息未消费
- 如下CONSUMER(消费者)管理按钮进入的页面,目前还没创建消费者
2、消费者接受消息
- 消费监听MessageListenerConcurrently是多线程消费,默认20个线程
- 返回消费状态
RECONSUME_LATER
、报错
、null
- 消息会重新回到队列,之后重试发送给消费者
@Test
public void testConsumer() throws Exception {// 创建默认消费者组DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test-consumer-group");// 设置nameServer地址consumer.setNamesrvAddr("localhost:9876");// 订阅一个主题来消费 *表示没有过滤参数 表示这个主题的任何消息consumer.subscribe("testTopic", "*");// 注册一个消费监听 MessageListenerConcurrently 是多线程消费,默认20个线程,可以参看consumer.setConsumeThreadMax()consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,ConsumeConcurrentlyContext context) {System.out.println(Thread.currentThread().getName() + "----" + msgs);// 返回消费的状态 如果是CONSUME_SUCCESS 则成功,若为RECONSUME_LATER则该条消息会被重回队列,重新被投递// 重试的时间为messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h// 也就是第一次1s 第二次5s 第三次10s .... 如果重试了18次 那么这个消息就会被终止发送给消费者// return ConsumeConcurrentlyStatus.RECONSUME_LATER;return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});// 这个start一定要写在registerMessageListener下面consumer.start();System.in.read();
}
- 状态栏这里记录的是生产者发送的条数,所以这里没有变化
- CONSUMER(消费者)管理栏记录消费的情况
3、代理者位点和消费者位点
- 生产者发送10条消息,消费者还没接受,则是如下
- 以队列2为例
- 在队列中,生产者发送一个消息,代理者位点,向左移动一位
- 消费者接收一个消息,消费者位点,向左移动一位,
如果消费者异常,则不移动
,之后还会给消费者再发此消息 - 差值则是代理者位点减去消费者位点,也就是
等待发送
给消费者的消息数量
- 开启消费者将消息全部接收后,差值为0
二、消费模型特点
1、同一个消费组的不同消费者,订阅主题必须相同
- 如果订阅主题不同,那么两个消费者将
接收不到
来自不同主题的的消息 - 只有关闭其中一个消费者,另外一个消费者就能正常运行了
2、不同消费者组订阅同一主题,都会收到一份消息
- 订阅的消费者组一定会收到消息,但是具体的消费者不一样可以收到
- 消费者组的策略
- 负载均衡模式:同一组内消费者轮训获取到消息
- 广播模式:同一组内消费者都能获取到消息
3、消费者组内负载均衡模式,消费者固定队列接收消息
- 如果有c1/c2两个消费者,4个通道,那么c1只接收0/1队列的消息,c2只接收2/3队列的消息
- 也就是系统会给消费者
尽量平均分配
可以接收的队列,没有分配的队列消息不会接收 - 如果消费者多,那么多出来的消费者
永远
接收不到消息
4、消费模式
- MQ的消费模式可以大致分为两种,一种是
推Push
,一种是拉Pull
- Push是服务端【MQ】主动推送消息给客户端
- 优点是
及时性
较好 - 但如果客户端没有做好流控,一旦服务端推送大量消息到客户端时,就会导致客户端
消息堆积
甚至崩溃
- 优点是
- Pull是客户端需要主动到服务端取数据
- 优点是客户端可以依据自己的消费能力进行消费
- 但拉取的频率也需要用户自己控制,拉取频繁容易造成服务端和客户端的压力,拉取间隔长又容易造成消费不及时
- Push模式也是基于Pull模式的,只是客户端内部封装了api(长轮训方式)
- 一般场景下,上游消息生产量
小
或者均速的时候,选择push模式
- 在特殊场景下,例如电商
大
促,抢优惠券等场景可以选择pull模式
- 一般场景下,上游消息生产量
三、发送不同类型消息
1、发送同步消息
- 上面的快速入门就是发送同步消息
- 发送过后会有一个返回值,也就是mq服务器接收到消息后返回的一个确认
- 这种方式非常安全,但是性能上并没有这么高
- 而且在mq集群中,也是要等到所有的从机都复制了消息以后才会返回
- 针对
重要的消息
可以选择这种方式
2、发送异步消息
- 异步消息通常用在对响应时间敏感的业务场景
- 即发送端不能容忍长时间地等待Broker的响应
- 发送完以后会有一个异步消息通知
- 与同步发送相比,send方法没有返回值
- 通过回调接口SendCallback获取发送成功还是失败
@Test
public void asyncProducer() throws Exception {DefaultMQProducer producer = new DefaultMQProducer("async-producer-group");producer.setNamesrvAddr("localhost:9876");producer.start();Message message = new Message("asyncTopic", "我是一个异步消息".getBytes());producer.send(message, new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {System.out.println("发送成功");}@Overridepublic void onException(Throwable e) {System.err.println("发送失败:" + e.getMessage());}});System.out.println("我先执行");System.in.read();
}
执行结果:
我先执行
发送成功
- 因为是异步,所以发送消息后,不论成功失败,继续往下走,执行“我先发送”
- 之后消费发送成功,回调函数执行“发送成功”
3、发送单向消息
- 这种方式主要用在不关心发送结果的场景
- 这种方式
吞吐量很大
,但是存在消息丢失
的风险 - 例如日志信息的发送
@Test
public void onewayProducer() throws Exception {DefaultMQProducer producer = new DefaultMQProducer("oneway-producer-group");producer.setNamesrvAddr("localhost:9876");producer.start();Message message = new Message("onewayTopic", "日志xxx".getBytes());producer.sendOneway(message);System.out.println("成功");producer.shutdown();
}
4、发送延迟消息
- 消息放入mq后,过一段时间,才会被监听到,然后消费
- 比如抢票业务
- 一个人抢到票,无论付款与否,都发送一个延时消息(车票id,此时状态为占用)
- 15分钟后, 去处理这里车票id
- 通过车票id判断是否付款,如果付款则什么都不处理
- 如果没有付款则将此车票id状态修改位未占用
@Test
public void msProducer() throws Exception {DefaultMQProducer producer = new DefaultMQProducer("ms-producer-group");producer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);producer.start();Message message = new Message("orderMsTopic", "订单号,座位号".getBytes());// 给消息设置一个延迟时间message.setDelayTimeLevel(3);// 发延迟消息producer.send(message);producer.shutdown();
}
- 通过message对象设置延迟对象,如下等级对应延迟时间
- 当然以后springboot项目也可以通过配置文件属性
delayTimeLevel
自定义时间
5、发送批量消息
- 可以一次性发送一组消息
@Test
public void testBatchProducer() throws Exception {// 创建默认的生产者DefaultMQProducer producer = new DefaultMQProducer("batch-producer-group");// 设置nameServer地址producer.setNamesrvAddr("localhost:9876");// 启动实例producer.start();List<Message> msgs = Arrays.asList(new Message("batchTopic", "我是一组消息的A消息".getBytes()),new Message("batchTopic", "我是一组消息的B消息".getBytes()),new Message("batchTopic", "我是一组消息的C消息".getBytes()));SendResult send = producer.send(msgs);System.out.println(send);// 关闭实例producer.shutdown();
}
- 一组消息都在同
一个队列
里,排队消费
- 一组3个消息,多线程一次消费了三个消息
@Test
public void testBatchConsumer() throws Exception {// 创建默认消费者组DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("batch-producer-group");// 设置nameServer地址consumer.setNamesrvAddr("localhost:9876");// 订阅一个主题来消费 表达式,默认是*consumer.subscribe("batchTopic", "*");// 注册一个消费监听 MessageListenerConcurrently是并发消费// 默认是20个线程一起消费,可以参看 consumer.setConsumeThreadMax()consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,ConsumeConcurrentlyContext context) {// 这里执行消费的代码 默认是多线程消费System.out.println(Thread.currentThread().getName() + "----" + new String(msgs.get(0).getBody()));return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();System.in.read();
}
执行结果:
ConsumeMessageThread_3----我是一组消息的C消息
ConsumeMessageThread_1----我是一组消息的A消息
ConsumeMessageThread_2----我是一组消息的B消息
6、发送顺序消息
- 消息有序指的是可以按照消息的
发送顺序
来消费 - 虽然队列FIFO先入先出,但RocketMQ的broker有四个queue
- 默认的情况下消息发送会采取
轮询
方式把消息发送到不同
的queue - 而消费消息的时候从多个queue上拉取消息
- 这种情况发送和消费是不能保证顺序
- 默认的情况下消息发送会采取
- 但是如果控制发送的顺序消息只依次发送到
同一个queue中
- 消费的时候只从这个queue上
依次拉取
,则就保证了顺序
- 消费的时候只从这个queue上
- 当发送和消费参与的queue只有一个,则是全局有序
- 如果多个queue参与,则为分区有序,即相对每个queue,消息都是有序的
发送顺序消息
- 两组订单,110订单和120订单
- 每组需要保证顺序:下订单->物流->签收
- 发送消息的send方法需要传一个MessageQueueSelector的实现类
实现select方法,返回MessageQueue对象(当前send消息放入哪个队列
)
@Test
public void testOrderlyProducer() throws Exception {// 创建默认的生产者DefaultMQProducer producer = new DefaultMQProducer("test-group");// 设置nameServer地址producer.setNamesrvAddr("localhost:9876");// 启动实例producer.start();List<Order> orderList = Arrays.asList(new Order(1, 110, "下订单"),new Order(2, 110, "物流"),new Order(3, 110, "签收"),new Order(4, 120, "下订单"),new Order(5, 120, "物流"),new Order(6, 120, "拒收"));// 循环集合开始发送orderList.forEach(order -> {Message message = new Message("TopicTest", order.toString().getBytes());try {// 发送的时候 相同的订单号选择同一个队列producer.send(message, new MessageQueueSelector() {@Overridepublic MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {// 当前主题有多少个队列int queueNumber = mqs.size();// 这个arg就是后面传入的 order.getOrderNumber()Integer i = (Integer) arg;// 用这个值去%队列的个数得到一个队列int index = i % queueNumber;// 返回选择的这个队列即可 ,那么相同的订单号 就会被放在相同的队列里 实现FIFO了return mqs.get(index);}}, order.getOrderNumber());} catch (Exception e) {System.out.println("发送异常");}});// 关闭实例producer.shutdown();
}
接收顺序消息
- 因为要顺序消费,所以不能用默认的
MessageListenerConcurrently多线程消费
- 这里需要用到
MessageListenerOrderly单线程消费
@Test
public void testOrderlyConsumer() throws Exception {// 创建默认消费者组DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer-group");// 设置nameServer地址consumer.setNamesrvAddr("localhost:9876");// 订阅一个主题来消费 *表示没有过滤参数 表示这个主题的任何消息consumer.subscribe("TopicTest", "*");// 注册一个消费监听 MessageListenerOrderly 是顺序消费 单线程消费consumer.registerMessageListener(new MessageListenerOrderly() {@Overridepublic ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {MessageExt messageExt = msgs.get(0);System.out.println(new String(messageExt.getBody()));return ConsumeOrderlyStatus.SUCCESS;}});consumer.start();System.in.read();
}
7、发送带标签的消息
- RocketMQ提供消息过滤功能,通过tag进行区分
- 订阅关系一致:同一个
消费者组
下所有消费者实例所订阅的Topic、Tag必须完全一致
- 我们往一个主题里面发送消息的时候,根据业务逻辑,可能需要区分
- 比如带有tagA标签的被A消费,带有tagB标签的被B消费
生产者发送标签消息(同一个主题,不同的标签
)
@Test
public void tagProducer() throws Exception {DefaultMQProducer producer = new DefaultMQProducer("tag-producer-group");producer.setNamesrvAddr("localhost:9876");producer.start();Message message = new Message("tagTopic", "vip1", "我是vip1的文章".getBytes());Message message2 = new Message("tagTopic", "vip2", "我是vip2的文章".getBytes());producer.send(message);producer.send(message2);System.out.println("发送成功");producer.shutdown();
}
- 消费者组a只监听主题为tagTopic,标签为vip1
- subscribe订阅方法第二个参数,默认
*
监听所有的标签
@Test
public void tagConsumer1() throws Exception {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("tag-consumer-group-a");consumer.setNamesrvAddr("localhost:9876");consumer.subscribe("tagTopic", "vip1");consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {System.out.println("我是vip1的消费者,我正在消费消息" + new String(msgs.get(0).getBody()));return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();System.in.read();
}
- 消费者组b监听主题为tagTopic,标签为vip1或vip2
@Test
public void tagConsumer2() throws Exception {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("tag-consumer-group-b");consumer.setNamesrvAddr("localhost:9876");consumer.subscribe("tagTopic", "vip1 || vip2");consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {System.out.println("我是vip2的消费者,我正在消费消息" + new String(msgs.get(0).getBody()));return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();System.in.read();
}
什么时候该用 Topic,什么时候该用 Tag?
总结:不同的业务应该使用不同的Topic如果是相同的业务里面有不同表的表现形式,那么我们要使用tag进行区分
可以从以下几个方面进行判断:
消息类型是否一致
:如普通消息、事务消息、定时(延时)消息、顺序消息,不同的消息类型使用不同的 Topic,无法通过 Tag 进行区分业务是否相关联
:没有直接关联的消息,如淘宝交易消息,京东物流消息使用不同的 Topic 进行区分;而同样是天猫交易消息,电器类订单、女装类订单、化妆品类订单的消息可以用 Tag 进行区分消息优先级是否一致
:如同样是物流消息,盒马必须小时内送达,天猫超市 24 小时内送达,淘宝物流则相对会慢一些,不同优先级的消息用不同的 Topic 进行区分。消息量级是否相当
:有些业务消息虽然量小但是实时性要求高,如果跟某些万亿量级的消息使用同一个 Topic,则有可能会因为过长的等待时间而“饿死”,此时需要将不同量级的消息进行拆分,使用不同的 Topic
通常情况下,不同的 Topic 之间的消息没有必然的联系,而 Tag 则用来区分同一个 Topic 下相互关联的消息
8、发送带key的消息
- 在rocketmq中的消息,默认会有一个
messageId
当做消息的全局唯一标识
- 我们也可以给消息携带一个key,用作
业务唯一标识
- 如果发送两次相同内容消息
- 业务唯一标识key肯定一样,可以阻止重复消费
- 但是上面默认的messageId则不一样,这样则无法区分重复数据
带key消息生产者
@Test
public void testKeyProducer() throws Exception {// 创建默认的生产者DefaultMQProducer producer = new DefaultMQProducer("test-group");// 设置nameServer地址producer.setNamesrvAddr("localhost:9876");// 启动实例producer.start();Message msg = new Message("TopicTest","我是一个带key的消息".getBytes());// 通过Message对象设置keyString key = UUID.randomUUID().toString();msg.setKeys(key);SendResult send = producer.send(msg);System.out.println(send);// 关闭实例producer.shutdown();
}
带key消息消费者(从MessageExt对象中获取key
)
@Test
public void testKeyConsumer() throws Exception {// 创建默认消费者组DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer-group");// 设置nameServer地址consumer.setNamesrvAddr("localhost:9876");// 订阅一个主题来消费 consumer.subscribe("TopicTest","*");// 注册一个消费监听 MessageListenerConcurrently是并发消费// 默认是20个线程一起消费,可以参看 consumer.setConsumeThreadMax()consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {MessageExt messageExt = msgs.get(0);// 从MessageExt对象获取keySystem.out.println("key值: " + messageExt.getKeys());System.out.println("消息体: " + new String(messageExt.getBody()));return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();System.in.read();
}
根据主题和key查询消息