Broker存储事务消息
在Broker中,事务消息的初始化是通过BrokerController.initialTransaction()方法执行的。3个核心的初始化变量
1.TransactionalMessageService.
事务消息主要用于处理服务,默认实现类是TransactionalMessageServiceImpl.如果想自定义事务消息
处理实现类,需要实现TransactionMessageService接口,然后通过ServiceProvider.loadClass()方法进行加载。
TransactionalMessageService接口的基本操作定义如下
- prepareMessage():用于保存Half事务消息,用户可以对其进行Commit或Rollback
- deletePrepareMessage():用于删除事务消息,一般用于Broker回查失败的Half消息。
- commitMessage():用于提交事务消息,使消费者可以正常地消费事务消息
- rollbackMessage():用于回滚事务消息,回滚后消费者将不能够消费该消息。通常用于生产者主动进行Rollback时,以及Broker回查的生产者本地事务失败时
- open():用于打开事务服务
- close():用于关闭事务服务
2.transactionMessageCheckListener.
事务消息回查监听器,默认实现类是DefaultTransactionalMessageCheckListener.如果想自定义回查监听处理,需要继承AbstractTransactionalMessageCheckListener接口,然后通过ServiceProvider.loadClass()方法被加载
3.transactionalMessageCheckService.
事务消息回查服务是一个线程服务,定时调用transactionalMessageService.check()方法,检查超时的Half消息
是否需要回查
额外两个单独处理
上面三个事务处理类完成初始化后,Broker就可以处理事务消息了。
Broker存储事务消息和普通消息都是通过SendMessageProcessor类进行处理的,只是在存储消息时有两处事务消息需要单独处理。
第一个单独处理,sendMessage()
这里获取消息中的扩展字段MessageConst.PROPERTY_TRANSACTION_PREPARED的值,
如果该值为True则返回当前消息是事务消息;再判断当前Broker的配置是否支持事务消息,如果不支持就返回生产者不支持事务消息的信息;如果支持,则调用TransactionalMessageService#prepareMessage()方法保存Half消息
第二个单独处理:存储前事务消息预处理,处理方法是TransactionalMessageBridge.praseHalfMessageInner()
该方法的功能是将原消息的Topic、queueId、susFlg存储在消息的扩展字段中,并且修改Topic的值为RMQ_SYS_TRANS_HALF_TOPIC,修改queueId的值为0,然后,与其他消息一样,调用DefaultMessageStore.putMessage()方法保存到CommitLog中,CommitLog存储成功后,通过CommitLog.DefaultAppendMessageCallback.doAppend()方法单独对事务消息进行处理
Prepared消息其实就是Half消息,其实现逻辑是,设置当前Half消息的
queueOffset值为0,而不是其真实的位点值。这样该位点就不会建立ConsumeQueue索引,自然也不能被消费者消费