1 前言
上节我们主要看了下消息生产者的启动以及消息的发送过程,内容比较多,篇幅比较长,有一些细节没看到,比如 Broker 的故障延迟机制,所以这节我们就单独来看一下这块内容。
还有我们要知道的是,这个机制默认是关闭的:
// ClientConfig /*** 开启消息发送的客户端容错机制* Enable the fault tolerance mechanism of the client sending process.* DO NOT OPEN when ORDER messages are required. 顺序消息不要开这个* Turning on will interfere with the queue selection functionality, 作用在选择消息队列的时候* possibly conflicting with the order message. 可能会跟顺序消息产生冲突* SEND_LATENCY_ENABLE = com.rocketmq.sendLatencyEnable* START_DETECTOR_ENABLE = com.rocketmq.startDetectorEnable*/ private boolean sendLatencyEnable = Boolean.parseBoolean(System.getProperty(SEND_LATENCY_ENABLE, "false")); private boolean startDetectorEnable = Boolean.parseBoolean(System.getProperty(START_DETECTOR_ENABLE, "false"));
2 选择消息队列
DefaultMQProducerImpl 在选择消息队列的时候,是交给了 MQFaultStrategy 来处理:
// DefaultMQProducerImpl#selectOneMessageQueue 选择i消息队列 public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName, final boolean resetIndex) {return this.mqFaultStrategy.selectOneMessageQueue(tpInfo, lastBrokerName, resetIndex); }
也就是我们上节看到的这里:
// MQFaultStrategy#selectOneMessageQueue // tpInfo tryToFindTopicPublishInfo获取的路由信息 // lastBrokerName 就是上一次选择的执行发送消息失败的Broker名称 启用 Broker 故障延迟机制用到 // resetIndex 第一次发送为false 重试发送为true public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName, final boolean resetIndex) {// 获取当前线程的 Broker 过滤器BrokerFilter brokerFilter = threadBrokerFilter.get();// 重置当前线程的 Broker 过滤器 brokerFilter.setLastBrokerName(lastBrokerName);// 是否开启Broker 故障延迟机制 默认关闭if (this.sendLatencyFaultEnable) {// 重置索引 重置 TopicPublishInfo内部的sendWhichQueue队列的索引if (resetIndex) {tpInfo.resetIndex();}// 尝试选择一个满足可用性过滤器和Broker过滤器的消息队列MessageQueue mq = tpInfo.selectOneMessageQueue(availableFilter, brokerFilter);if (mq != null) {return mq;}// 如果上述选择失败,尝试选择一个满足可访问性过滤器和Broker过滤器的消息队列mq = tpInfo.selectOneMessageQueue(reachableFilter, brokerFilter);if (mq != null) {return mq;}// 如果都选择失败,退而求其次,选择任意一个消息队列return tpInfo.selectOneMessageQueue();}// 如果未启用Broker 故障延迟机制,则直接使用Broker过滤器选择消息队列MessageQueue mq = tpInfo.selectOneMessageQueue(brokerFilter);if (mq != null) {return mq;}// 如果选择失败,退而求其次,选择任意一个消息队列return tpInfo.selectOneMessageQueue(); }
这块就是选择消息队列的核心思想:
对于不开启容错机制的,先根据当前线程的上次执行异常的 Broker 名称过滤筛选一个消息队列,当然 lastBrokerName 也可以是空的(比如第一次发送的时候),是空的话,那就跟下边的空参的 selectOneMessageQueue 类似,随机选择一个消息队列即可。
对于开启了容错机制的,它大概有四步:
(1)重试会重置当前线程的 index
(2)根据 availiableFilter + 名称过滤器 筛选消息队列
(3)如果步骤2为空,再次根据 reachableFilte + 名称过滤器r 筛选消息队列
(4)如果步骤3为空,进入兜底空参的 selectOneMessageQueue 随机选择一个消息队列。
可以看到两种方式最后的兜底其实都一样的,无论如何都要选择一个消息队列哈。然后对于开启了容错机制的,会有两次过滤,也是我们本节要理解的重点。
3 容错机制
3.1 核心类
我们先回顾下容错机制相关的核心类:
MQFaultStrategy:选择消息队列入口主逻辑控制以及内部的latencyFaultTolerance起到衔接筛选队列的作用
LatencyFaultTolerance 接口:维护容错信息筛选队列 实现类是 LatencyFaultToleranceImpl 内部的faultItemTable 维护当前出现故障的信息
FaultItem 故障项类:故障明细实例
TopicPublishInfo:从自己当前的队列中以及筛选器过滤出符合条件的消息队列
我这里画了一下这几个类的执行过程:
要理解好执行流程,就要理解好每个核心类的核心属性的由来很重要,接下来我们看看。
3.2 TopicPublishInfo
根据过滤器筛选队列的方法中,涉及到 messageQueueList 和 sendWhichQueue 两个属性,看看它的由来。
// TopicPublishInfo#selectOneMessageQueue public MessageQueue selectOneMessageQueue(QueueFilter ...filter) {return selectOneMessageQueue(this.messageQueueList, this.sendWhichQueue, filter); }
我们先看看 messageQueueList 属性,TopicPublishInfo 是 tryToFindTopicPublishInfo 方法来的,也就是根据某个 Topic 获取它的路由信息。而远程请求的响应结果是 TopicRouteData 类型的,它是经过 MQClientInstance#topicRouteData2TopicPublishInfo 方法进行转换得到的,
// MQClientInstance#topicRouteData2TopicPublishInfo // 用于将请求的路由信息 TopicRouteData 转换为生产者适用的 TopicPublishInfo public static TopicPublishInfo topicRouteData2TopicPublishInfo(final String topic, final TopicRouteData route) {// List<MessageQueue> messageQueueList = new ArrayList<>();TopicPublishInfo info = new TopicPublishInfo();info.setTopicRouteData(route);if (route.getOrderTopicConf() != null && route.getOrderTopicConf().length() > 0) {// 处理有序主题配置 ...} else if (route.getOrderTopicConf() == null&& route.getTopicQueueMappingByBroker() != null&& !route.getTopicQueueMappingByBroker().isEmpty()) {// 处理静态主题配置 ...} else {// 处理普通主题配置 我们主要看这里// 当前 Topic 的队列信息List<QueueData> qds = route.getQueueDatas();// 排序下(根据里边的 Broker 名称升序排序 Collections.sort(qds);// 循环处理for (QueueData qd : qds) {// 队列是可写的话 计算公式:(perm & 2) == 2if (PermName.isWriteable(qd.getPerm())) {BrokerData brokerData = null;// 找这个队列的 Broker 信息for (BrokerData bd : route.getBrokerDatas()) {if (bd.getBrokerName().equals(qd.getBrokerName())) {brokerData = bd;break;}}// 没有的话 忽略掉这个队列if (null == brokerData) {continue;}// broker 没有 master 信息 也忽略这个队列if (!brokerData.getBrokerAddrs().containsKey(MixAll.MASTER_ID)) {continue;}// 遍历该队列可写的队列数 默认 4个for (int i = 0; i < qd.getWriteQueueNums(); i++) {// 创建队列信息MessageQueue mq = new MessageQueue(topic, qd.getBrokerName(), i);// 放进 info 对象的 messageQueueList 集合中 info.getMessageQueueList().add(mq);}}}// 不是顺序消息的 Topicinfo.setOrderTopic(false);}return info; }
好了,也就是获取到该 Topic 在 Broker 中的分布情况,然后根据队列的数量,创建出来对应的消息队列对象,最后统一存放到 TopicPublishInfo 的 messageQueueList 集合中。
再看下 sendWhichQueue 属性,它是 volatile 修饰的成员变量,也是随着实例化创建出来的:
public class TopicPublishInfo {private boolean orderTopic = false;private boolean haveTopicRouterInfo = false;private List<MessageQueue> messageQueueList = new ArrayList<>();private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex(); }
再看下 ThreadLocalIndex 类:
public class ThreadLocalIndex {// 本地线程变量private final ThreadLocal<Integer> threadLocalIndex = new ThreadLocal<>();// 随机数private final Random random = new Random();private final static int POSITIVE_MASK = 0x7FFFFFFF;public int incrementAndGet() {// 获取当前线程的计数器Integer index = this.threadLocalIndex.get();// 第一次为空的话 随机数if (null == index) {index = random.nextInt();}// 设置到本地线程去this.threadLocalIndex.set(++index);// 与 POSITIVE_MASK 取模return index & POSITIVE_MASK;}// 重置public void reset() {// 获取随机数int index = Math.abs(random.nextInt(Integer.MAX_VALUE));if (index < 0) {index = 0;}// 设置进去this.threadLocalIndex.set(index);} }
messageQueueList 存放当前 Topic 分布在所有 Broker 上的队列信息,比如有两个 Broker 都有分布吗,每个 Broker 创建 4个队列存储消息,那么这个集合里有8条数据,每个 Broker 个 4个消息队列。
sendWhichQueue:是一个存放在本地线程的随机数
3.3 FaultItem 故障明细
我们看下这个类:
public class FaultItem implements Comparable<FaultItem> {// broker 名称也就是哪个 broker 故障了private final String name;// 当前的延迟时间private volatile long currentLatency;// 恢复时间 也就是到哪个时间戳时就不故障了 System.currentTimeMillis() > startTimestamp 说明就不故障了private volatile long startTimestamp;// 检查时间 搭配 detect 使用 也就是背后有个线程来检查这些故障是不是好了 LatencyFaultToleranceImpl有调度线程池 下边会看private volatile long checkStamp;// 是否可达private volatile boolean reachableFlag;public FaultItem(final String name) {this.name = name;}// 更新延迟时间public void updateNotAvailableDuration(long notAvailableDuration) {// 当前时间戳 + 延迟时间 大于上次设置的恢复时间 说明是需要延长恢复时间if (notAvailableDuration > 0 && System.currentTimeMillis() + notAvailableDuration > this.startTimestamp) {// 重新设置恢复时间this.startTimestamp = System.currentTimeMillis() + notAvailableDuration;log.info(name + " will be isolated for " + notAvailableDuration + " ms.");}}... }
name、currentLatency、startTimestamp应该比较好理解,我i们看下 checkStamp 和 reachableFlag 两个属性值的由来。
首先看 checkStamp,它是跟检查相关的,在 LatencyFaultToleranceImpl 类里有个 startDetector 方法,提交一个调度任务,用来检查这些故障明细:
// LatencyFaultToleranceImpl#startDetector // private volatile boolean startDetectorEnable = false; // private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() { // @Override // public Thread newThread(Runnable r) { // return new Thread(r, "LatencyFaultToleranceScheduledThread"); // } // }); public void startDetector() {this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {// 是否开启了检测 默认不开启if (startDetectorEnable) {// 检查 detectByOneRound();}} catch (Exception e) {log.warn("Unexpected exception raised while detecting service reachability", e);}}// 每隔 3秒 执行一趟}, 3, 3, TimeUnit.SECONDS); } // private int detectTimeout = 200; 检查超时时间 // private int detectInterval = 2000; 检查间隔时间 默认 2秒 // private final ServiceDetector serviceDetector; 检查的方法 public void detectByOneRound() {// 遍历for (Map.Entry<String, FaultItem> item : this.faultItemTable.entrySet()) {FaultItem brokerItem = item.getValue();// checkStamp 发挥作用了 初始为0 后续每检查一次重置检查时间if (System.currentTimeMillis() - brokerItem.checkStamp >= 0) {// 重置检查时间 = 当前时间 + 检查间隔时间 2秒brokerItem.checkStamp = System.currentTimeMillis() + this.detectInterval;// 获取到该 Broker 的地址String brokerAddr = resolver.resolve(brokerItem.getName());// 如果为空 直接移除 说明 Broker 不存在了都if (brokerAddr == null) {faultItemTable.remove(item.getKey());continue;}// 如果没设置检查 那都不知道怎么检查 还检查啥 不检查了if (null == serviceDetector) {continue;}// 判断是不是好了boolean serviceOK = serviceDetector.detect(brokerAddr, detectTimeout);// 如果好了的话 设置故障明细的 reachableFlag = true 表示可以给我发消息了if (serviceOK && !brokerItem.reachableFlag) {log.info(brokerItem.name + " is reachable now, then it can be used.");brokerItem.reachableFlag = true;}}} }
那我们顺便看下 serviceDetector 以及启动故障发现 startDetector 的入口:
serviceDetector 是由实例化 MQFaultStrategy 的时候传进来的:
public MQFaultStrategy(ClientConfig cc, Resolver fetcher, ServiceDetector serviceDetector) {this.latencyFaultTolerance = new LatencyFaultToleranceImpl(fetcher, serviceDetector);this.latencyFaultTolerance.setDetectInterval(cc.getDetectInterval());this.latencyFaultTolerance.setDetectTimeout(cc.getDetectTimeout());this.setStartDetectorEnable(cc.isStartDetectorEnable());this.setSendLatencyFaultEnable(cc.isSendLatencyEnable()); }
而 MQFaultStrategy 的实例化又是在实例化 DefaultMQProducerImpl 的时候:
// DefaultMQProducerImpl public DefaultMQProducerImpl(final DefaultMQProducer defaultMQProducer, RPCHook rpcHook) {...// 检查ServiceDetector serviceDetector = new ServiceDetector() {@Overridepublic boolean detect(String endpoint, long timeoutMillis) {Optional<String> candidateTopic = pickTopic();if (!candidateTopic.isPresent()) {return false;}try {MessageQueue mq = new MessageQueue(candidateTopic.get(), null, 0);mQClientFactory.getMQClientAPIImpl().getMaxOffset(endpoint, mq, timeoutMillis);return true;} catch (Exception e) {return false;}}};// 实例化 MQFaultStrategythis.mqFaultStrategy = new MQFaultStrategy(defaultMQProducer.cloneClientConfig(), new Resolver() {@Overridepublic String resolve(String name) {return DefaultMQProducerImpl.this.mQClientFactory.findBrokerAddressInPublish(name);}}, serviceDetector); }
那我们再看看 startDetector 的入口,相通的,是在启动生产者的时候,通过调用 MQFaultStrategy 的 startDetector方法继而调用 LatencyFaultToleranceImpl 的 startDetector 方法。
// DefaultMQProducerImpl#start public void start(final boolean startFactory) throws MQClientException {switch (this.serviceState) {case CREATE_JUST:...if (startFactory) {mQClientFactory.start();}// 启动 MQFaultStrategy 的检查this.mqFaultStrategy.startDetector();...break;case RUNNING:case START_FAILED:case SHUTDOWN_ALREADY:throw new MQClientException("The producer service state not OK, maybe started once, "+ this.serviceState+ FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),null);default:break;}this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();RequestFutureHolder.getInstance().startScheduledTask(this); } // MQFaultStrategy#startDetector public void startDetector() {this.latencyFaultTolerance.startDetector(); }
看完 checkStamp,它是配合检查的调度任务及时移除已经恢复的故障明细,以及启动故障检查的入口,我们继续看下 reachableFlag 的变动由来,它的值的变动是经过 updateFaultItem 方法来变化的:
// LatencyFaultToleranceImpl#updateFaultItem // name broker名称 // currentLatency 延迟时间 // notAvailableDuration 不可用时间 // reachable 是否可达 // private final ConcurrentHashMap<String, FaultItem> faultItemTable = new ConcurrentHashMap<String, FaultItem>(16); public void updateFaultItem(final String name, final long currentLatency, final long notAvailableDuration, final boolean reachable) {// map 中看有没有当前 broker 的信息FaultItem old = this.faultItemTable.get(name);// 如果不存在 直接创建对象放进去if (null == old) {final FaultItem faultItem = new FaultItem(name);faultItem.setCurrentLatency(currentLatency);faultItem.updateNotAvailableDuration(notAvailableDuration);faultItem.setReachable(reachable);old = this.faultItemTable.putIfAbsent(name, faultItem);}// 存在的话 直接更新if (null != old) {old.setCurrentLatency(currentLatency);old.updateNotAvailableDuration(notAvailableDuration);old.setReachable(reachable);}// 不可达 打印日志if (!reachable) {log.info(name + " is unreachable, it will not be used until it's reachable");} }
那谁来调用 updateFaultItem 的呢?是由 MQFaultStrategy 来的:
// MQFaultStrategy#updateFaultItem public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation, final boolean reachable) {// 当开启了故障延迟的话if (this.sendLatencyFaultEnable) {// 计算延迟时间 isolation 为 true 说明需要延长 每次延长 10秒 为false 直接用参数 currentLatencylong duration = computeNotAvailableDuration(isolation ? 10000 : currentLatency);this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration, reachable);} } // 计算延迟时间 // private long[] latencyMax = {50L, 100L, 550L, 1800L, 3000L, 5000L, 15000L}; // private long[] notAvailableDuration = {0L, 0L, 2000L, 5000L, 6000L, 10000L, 30000L}; private long computeNotAvailableDuration(final long currentLatency) {for (int i = latencyMax.length - 1; i >= 0; i--) {if (currentLatency >= latencyMax[i]) {return this.notAvailableDuration[i];}}return 0; }
那它又是谁调用的呢? 是 DefaultMQProducerImpl 来的:
// DefaultMQProducerImpl#updateFaultItem public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation, boolean reachable) {this.mqFaultStrategy.updateFaultItem(brokerName, currentLatency, isolation, reachable); }
那它又是谁调用的呢?是在消息发送的核心方法里:
private SendResult sendDefaultImpl(Message msg, final CommunicationMode communicationMode, final SendCallback sendCallback, final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {...// 获取当前 Topic 的路由信息TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());if (topicPublishInfo != null && topicPublishInfo.ok()) {MessageQueue mq = null;int times = 0;String[] brokersSent = new String[timesTotal];boolean resetIndex = false;for (; times < timesTotal; times++) {// 选择队列MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName, resetIndex);if (mqSelected != null) {mq = mqSelected;brokersSent[times] = mq.getBrokerName();try {sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);endTimestamp = System.currentTimeMillis();// 1、都发送成功了 说明可达this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false, true);switch (communicationMode) {case ASYNC:return null;case ONEWAY:return null;case SYNC:if (sendResult.getSendStatus() != SendStatus.SEND_OK) {if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {continue;}}return sendResult;default:break;}} catch (MQClientException e) {endTimestamp = System.currentTimeMillis();// 2、客户端异常 还没开始请求 也可达this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false, true);log.warn("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq, e);log.warn(msg.toString());exception = e;continue;} catch (RemotingException e) {endTimestamp = System.currentTimeMillis();// 远程异常但不确定是不是 Broker 造成的 if (this.mqFaultStrategy.isStartDetectorEnable()) {// 3、如果开启了异常发现 设置为不可达 因为发现会让他恢复的this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true, false);} else {// 4、不明确 设置可达this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true, true);}log.warn("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq, e);if (log.isDebugEnabled()) {log.debug(msg.toString());}exception = e;continue;} catch (MQBrokerException e) {endTimestamp = System.currentTimeMillis();// 5、明确 Broker 异常 不可达this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true, false);log.warn("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq, e);if (log.isDebugEnabled()) {log.debug(msg.toString());}exception = e;if (this.defaultMQProducer.getRetryResponseCodes().contains(e.getResponseCode())) {continue;} else {if (sendResult != null) {return sendResult;}throw e;}} catch (InterruptedException e) {endTimestamp = System.currentTimeMillis();// 6、中断异常 可达this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false, true);log.warn("sendKernelImpl exception, throw exception, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq, e);if (log.isDebugEnabled()) {log.debug(msg.toString());}throw e;}} else {break;}}...throw mqClientException;}validateNameServerSetting();throw new MQClientException("No route info of this topic: " + msg.getTopic() + FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO),null).setResponseCode(ClientErrorCode.NOT_FOUND_TOPIC_EXCEPTION); }
总共大概 6个 地方会涉及到可达的更改。
好啦,关于 FaultItem 我们知道 checkStamp 是搭配 defect 来检查故障明细的,reachableFlag 的变动是在消息发送方法里根据当前的异常信息来做更新。
看了 TopicPublishInfo 以及 FaultItem 已经把 MQFaultStrategy 和 LatencyFaultToleranceImpl 都串着看了,就不看了哈。
4 小结
好啦,本节主要对选择消息队列时的 Broker 故障延迟机制的核心类进行了深入的了解以及他们的协同关系,有理解不对的地方还请指正哈。