- 单向发送: 把消息发向Broker服务器, 不管Broker是否接收, 只管发, 不管结果。
- 同步发送: 把消息发向Broker服务器, 如果Broker成功接收, 可以得到Broker的响应。
- 异步发送: 把消息发向Broker服务器, 如果Broker成功接收, 可以得到Broker的响应。异步所以发送消息后, 不用等待, 等到Broker服务器的响应调用回调。
文章目录
- 1.send源码入口
- 1.1 同步消息
- 1.2 单向消息
- 1.3 异步消息
- 2.sendDefaultImpl发送消息实现
- 2.1 makeSureStateOK确定生产者服务状态
- 2.2 checkMessage校验消息的合法性
- 2.3 tryToFindTopicPublishInfo查找topic的发布信息
- 2.4 计算发送次数timesTotal
- 2.5 selectOneMessageQueue选择消息队列
- 2.6 sendKernelImpl发送消息
- 2.6.1 findBrokerAddressInPublish查找broker地址
- 2.6.2 brokerVIPChannel判断vip通道
- 2.6.3 setUniqID生成uniqId
- 2.6.4 tryToCompressMessage压缩消息
- 2.7 updateFaultItem更新故障表
- 2.7.1 computeNotAvailableDuration计算隔离时间
- 2.7.2 updateFaultItem更新故障表
- 3.总结
DefaultMQProducer提供了很多send方法的重载:
1.send源码入口
1.1 同步消息
public SendResult send(Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {//调用defaultMQProducerImpl#send发送消息return this.defaultMQProducerImpl.send(msg);
}
defaultMQProducerImpl#send发送消息。
调用另一个send(), 超时时间为3s。
public SendResult send(Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {//调用另一个send方法,设置超时时间参数,默认3000msreturn send(msg, this.defaultMQProducer.getSendMsgTimeout());
}
/*** DefaultMQProducerImpl的方法** @param msg 消息* @param timeout 超时时间,毫秒值*/
public SendResult send(Message msg,long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {//调用另一个sendDefaultImpl方法,设置消息发送模式为SYNC,即同步;设置回调函数为nullreturn this.sendDefaultImpl(msg, CommunicationMode.SYNC, null, timeout);
}
该方法内部又调用另一个sendDefaultImpl方法, 设置消息发送方式为SYNC, 为同步, 设置回调函数为null。
1.2 单向消息
单向消息使用sendOneway发送。
public void sendOneway(Message msg) throws MQClientException, RemotingException, InterruptedException{//根据namespace设置topicmsg.setTopic(withNamespace(msg.getTopic()));//调用defaultMQProducerImpl#sendOneway发送消息this.defaultMQProducerImpl.sendOneway(msg);
}
defaultMQProducerImpl#sendOneway。
public void sendOneway(Message msg) throws MQClientException, RemotingException, InterruptedException {try {//调用sendDefaultImpl方法,设置消息发送模式为ONEWAY,即单向;设置回调函数为null;设置超时时间参数,默认3000msthis.sendDefaultImpl(msg, CommunicationMode.ONEWAY, null, this.defaultMQProducer.getSendMsgTimeout());} catch (MQBrokerException e) {throw new MQClientException("unknown exception", e);}
}
最终调用sendDefaultImpl方法, 发送模式为ONEWAY, 设置回调函数为null, 超时时间为3s。
1.3 异步消息
异步消息使用带有callback函数的send方法发送。
public void send(Message msg, SendCallback sendCallback) throws MQClientException,RemotingException, InterruptedException {//根据namespace设置topicmsg.setTopic(withNamespace(msg.getTopic()));//调用defaultMQProducerImpl#send发送消息,带有sendCallback参数this.defaultMQProducerImpl.send(msg, sendCallback);
}
该方法内部调用defaultMQProducerImpl#send方法发送消息, 带sendCallback参数。
public void send(Message msg, SendCallback sendCallback) throws MQClientException,
RemotingException, InterruptedException {//该方法内部又调用另一个send方法,设置超时时间参数,默认3000ms。send(msg, sendCallback, this.defaultMQProducer.getSendMsgTimeout());
}
public void send(final Message msg, final SendCallback sendCallback, final long timeout)throws MQClientException, RemotingException, InterruptedException {//调用起始时间final long beginStartTime = System.currentTimeMillis();//获取异步发送执行器线程池ExecutorService executor = this.getAsyncSenderExecutor();try {/** 使用线程池异步的执行sendDefaultImpl方法,即异步发送消息*/executor.submit(new Runnable() {@Overridepublic void run() {/** 发送之前计算超时时间,如果超时则不发送,直接执行回调函数onException方法*/long costTime = System.currentTimeMillis() - beginStartTime;if (timeout > costTime) {try {//调用sendDefaultImpl方法执行发送操作sendDefaultImpl(msg, CommunicationMode.ASYNC, sendCallback, timeout - costTime);} catch (Exception e) {//抛出异常,执行回调函数onException方法sendCallback.onException(e);}} else {//超时,执行回调函数onException方法sendCallback.onException(new RemotingTooMuchRequestException("DEFAULT ASYNC send call timeout"));}}});} catch (RejectedExecutionException e) {throw new MQClientException("executor rejected ", e);}}
方法内部会获取获取异步发送执行器线程池, 使用线程池异步的执行sendDefaultImpl方法, 即异步发送。
发送之前计算超时时间, 如果超时则执行回调函数onException()。
2.sendDefaultImpl发送消息实现
该方法位于DefaultMQProducerImpl中, 无论是同步, 异步, 还是单向, 最后调用的都是sendDefaultImpl方法。
- makeSureStateOK方法, 确定此producer的服务状态正常, 如果服务状态不是RUNNING, 那么抛出异常。
- 检查消息的合法性。
- 调用tryToFindTopicPublishInfo方法, 尝试查找消息的一个topic路由, 进行发送消息。
- 计算循环发送消息的总次数timesTotal, 默认情况下, 同步为3, 允许重试2次, 其他模式为1, 即不允许重试。实际上, 异步发送消息可以最多重试2次, 不是这里实现的。
- 调用selectOneMessageQueue方法, 选择一个队列MessageQueue, 支持失败故障转移。
- 调用sendKernelImpl方法发送消息, 同步、异步、单向发送消息都是这个方法实现的。
- 调用updateFaultItem方法, 更新本地错误表缓存数据, 用于延迟时间的故障转移的功能。
- 根据发送模式的不同, 如果是异步或者单向发送则直接返回, 如果是同步的话, 如果开启了retryAnotherBrokerWhenNotStoreOK, 那么如果返回值不返回SEND_OK状态, 则重新执行。
- 此过程中, 如果抛出RemotingException、MQClientException、以及MQBrokerException异常, 那么会重试, 如果抛出InterruptedException, 或者超时则不会重试。
/*** DefaultMQProducerImpl的方法** @param msg 方法* @param communicationMode 通信模式* @param sendCallback 回调方法* @param timeout 超时时间*/
private SendResult sendDefaultImpl(Message msg,final CommunicationMode communicationMode,final SendCallback sendCallback,final long timeout
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {/** 1 确定此producer的服务状态正常,如果服务状态不是RUNNING,那么抛出异常*/this.makeSureStateOK();/** 2 校验消息的合法性*/Validators.checkMessage(msg, this.defaultMQProducer);//生成本次调用idfinal long invokeID = random.nextLong();//开始时间戳long beginTimestampFirst = System.currentTimeMillis();long beginTimestampPrev = beginTimestampFirst;//结束时间戳long endTimestamp = beginTimestampFirst;/** 3 尝试查找消息的一个topic路由,用以发送消息*/TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());//找到有效的topic信息if (topicPublishInfo != null && topicPublishInfo.ok()) {boolean callTimeout = false;MessageQueue mq = null;Exception exception = null;SendResult sendResult = null;/** 4 计算发送消息的总次数* 同步模式为3,即默认允许重试2次,可更改重试次数;其他模式为1,即不允许重试,不可更改*/int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;int times = 0;//记录每一次重试时候发送消息目标Broker名字的数组String[] brokersSent = new String[timesTotal];/** 在循环中,发送消息,包含消息重试的逻辑,总次数默认不超过3*/for (; times < timesTotal; times++) {//上次使用过的broker,可以为空,表示第一次选择String lastBrokerName = null == mq ? null : mq.getBrokerName();/** 5 选择一个消息队列MessageQueue*/MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);if (mqSelected != null) {mq = mqSelected;//设置brokerNamebrokersSent[times] = mq.getBrokerName();try {//调用的开始时间beginTimestampPrev = System.currentTimeMillis();//如果还有可调用次数,那么if (times > 0) {//在重新发送期间用名称空间重置topicmsg.setTopic(this.defaultMQProducer.withNamespace(msg.getTopic()));}//现在调用的开始时间 减去 开始时间,判断时候在调用发起之前就超时了long costTime = beginTimestampPrev - beginTimestampFirst;//如果已经超时了,那么直接结束循环,不再发送//即超时的时候,即使还剩下重试次数,也不会再继续重试if (timeout < costTime) {callTimeout = true;break;}/** 6 异步、同步、单向发送消息*/sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);//方法调用结束时间戳endTimestamp = System.currentTimeMillis();/** 7 更新本地错误表缓存数据,用于延迟时间的故障转移的功能*/this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);/** 8 根据发送模式执行不同的处理*/switch (communicationMode) {//异步和单向模式直接返回nullcase ASYNC:return null;case ONEWAY:return null;case SYNC://同步模式,如果开启了retryAnotherBrokerWhenNotStoreOK开关,那么如果不是返回SEND_OK状态,则仍然会执行重试发送if (sendResult.getSendStatus() != SendStatus.SEND_OK) {if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {continue;}}//如果发送成功,则返回return sendResult;default:break;}} catch (RemotingException e) {//RemotingException异常,会执行重试endTimestamp = System.currentTimeMillis();this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);log.warn(msg.toString());exception = e;continue;} catch (MQClientException e) {//MQClientException异常,会执行重试endTimestamp = System.currentTimeMillis();this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);log.warn(msg.toString());exception = e;continue;} catch (MQBrokerException e) {//MQBrokerException异常endTimestamp = System.currentTimeMillis();this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);log.warn(msg.toString());exception = e;//如果返回的状态码属于一下几种,则支持重试://ResponseCode.TOPIC_NOT_EXIST,//ResponseCode.SERVICE_NOT_AVAILABLE,//ResponseCode.SYSTEM_ERROR,//ResponseCode.NO_PERMISSION,//ResponseCode.NO_BUYER_ID,//ResponseCode.NOT_IN_CURRENT_UNITif (this.defaultMQProducer.getRetryResponseCodes().contains(e.getResponseCode())) {continue;} else {//其他状态码不支持重试,如果有结果则返回,否则直接抛出异常if (sendResult != null) {return sendResult;}throw e;}} catch (InterruptedException e) {//InterruptedException异常,不会执行重试endTimestamp = System.currentTimeMillis();this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);log.warn(String.format("sendKernelImpl exception, throw exception, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);log.warn(msg.toString());log.warn("sendKernelImpl exception", e);log.warn(msg.toString());throw e;}} else {break;}}/** 抛出异常的操作*/if (sendResult != null) {return sendResult;}String info = String.format("Send [%d] times, still failed, cost [%d]ms, Topic: %s, BrokersSent: %s",times,System.currentTimeMillis() - beginTimestampFirst,msg.getTopic(),Arrays.toString(brokersSent));info += FAQUrl.suggestTodo(FAQUrl.SEND_MSG_FAILED);MQClientException mqClientException = new MQClientException(info, exception);if (callTimeout) {throw new RemotingTooMuchRequestException("sendDefaultImpl call timeout");}if (exception instanceof MQBrokerException) {mqClientException.setResponseCode(((MQBrokerException) exception).getResponseCode());} else if (exception instanceof RemotingConnectException) {mqClientException.setResponseCode(ClientErrorCode.CONNECT_BROKER_EXCEPTION);} else if (exception instanceof RemotingTimeoutException) {mqClientException.setResponseCode(ClientErrorCode.ACCESS_BROKER_TIMEOUT);} else if (exception instanceof MQClientException) {mqClientException.setResponseCode(ClientErrorCode.BROKER_NOT_EXIST_EXCEPTION);}throw mqClientException;}validateNameServerSetting();throw new MQClientException("No route info of this topic: " + msg.getTopic() + FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO),null).setResponseCode(ClientErrorCode.NOT_FOUND_TOPIC_EXCEPTION);
}
2.1 makeSureStateOK确定生产者服务状态
确定此producer的服务状态正常, 如果服务状态不是RUNNING, 那么抛出异常。
/*** DefaultMQProducerImpl的方法*/
private void makeSureStateOK() throws MQClientException {//服务状态不是RUNNING,那么抛出MQClientException异常。if (this.serviceState != ServiceState.RUNNING) {throw new MQClientException("The producer service state not OK, "+ this.serviceState+ FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),null);}
}
2.2 checkMessage校验消息的合法性
- 如果msg消息为null, 抛出异常。
- 校验topic, 如果topic为空, 或者长度大于127字符, 或者topic中有非法字符则抛出异常, 如果当前topic是不为允许使用的系统topic, 抛出异常。
- 校验消息体, 如果消息体为null, 或者为空数组, 或者消息字节数组长度大于4M, 抛出异常。
/*** Validators的方法*/
public static void checkMessage(Message msg, DefaultMQProducer defaultMQProducer) throws MQClientException {//如果消息为null,抛出异常if (null == msg) {throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message is null");}/** 校验topic*///如果topic为空,或者长度大于127个字符,或者topic的字符串不符合 "^[%|a-zA-Z0-9_-]+$"模式,即包含非法字符,那么抛出异常Validators.checkTopic(msg.getTopic());//如果当前topic是不为允许使用的系统topic SCHEDULE_TOPIC_XXXX,那么抛出异常Validators.isNotAllowedSendTopic(msg.getTopic());// body//如果消息体为null,那么抛出异常if (null == msg.getBody()) {throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message body is null");}//如果消息体为空数组,那么抛出异常if (0 == msg.getBody().length) {throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message body length is zero");}//如果消息 字节数组长度大于4,194,304,即消息的大小大于4M,那么抛出异常if (msg.getBody().length > defaultMQProducer.getMaxMessageSize()) {throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL,"the message body size over max value, MAX: " + defaultMQProducer.getMaxMessageSize());}
}public static void checkTopic(String topic) throws MQClientException {//如果topic为空,那么抛出异常if (UtilAll.isBlank(topic)) {throw new MQClientException("The specified topic is blank", null);}//如果topic长度大于127个字符,那么抛出异常if (topic.length() > TOPIC_MAX_LENGTH) {throw new MQClientException(String.format("The specified topic is longer than topic max length %d.", TOPIC_MAX_LENGTH), null);}//如果topic字符串包含非法字符,那么抛出异常if (isTopicOrGroupIllegal(topic)) {throw new MQClientException(String.format("The specified topic[%s] contains illegal characters, allowing only %s", topic,"^[%|a-zA-Z0-9_-]+$"), null);}
}
2.3 tryToFindTopicPublishInfo查找topic的发布信息
该方法用于查找指定topic的发布信息TopicPublishInfo。
- 首先在本地缓存topicPublishInfoTable获取。
- updateTopicRouteInfoFromNameServer()获取, 从nameServer同步此topic的路由配置信息。
- 从nameServer同步topic数据。
/*** DefaultMQProducerImpl的方法* <p>* 查找指定topic的推送信息*/
private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {//尝试直接从producer的topicPublishInfoTable中获取topic信息TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);//如果没有获取到有效信息,if (null == topicPublishInfo || !topicPublishInfo.ok()) {//那么立即创建一个TopicPublishInfothis.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());//立即从nameServer同步此topic的路由配置信息,并且更新本地缓存this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);//再次获取topicPublishInfotopicPublishInfo = this.topicPublishInfoTable.get(topic);}//如果找到的路由信息是可用的,直接返回if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {return topicPublishInfo;} else {//再次从nameServer同步topic的数据,不过这次使用默认的topic “TBW102”去找路由配置信息作为本topic参数信息this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);topicPublishInfo = this.topicPublishInfoTable.get(topic);return topicPublishInfo;}
}
- 首先在本地缓存topicPublishInfoTable获取, 如果没有获取到, 调用updateTopicRouteInfoFromNameServer方法从nameServer同步此topic的路由配置信息, 并更新缓存。如果还是没有获取到有效数据, 再次从nameServer同步topic数据, 不过这次默认的topic是 “TBW102”去找路由配置信息作为本topic参数信息。
TopicPublishInfo: topic的发布信息
/*** 是否是顺序消息*/
private boolean orderTopic = false;
/*** 是否包含路由信息*/
private boolean haveTopicRouterInfo = false;
/*** topic的消息队列集合*/
private List<MessageQueue> messageQueueList = new ArrayList<MessageQueue>();
/*** 当前线程线程的消息队列的下标,循环选择消息队列使用+1*/
private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex();
/*** topic路由信息,包括topic的队列信息queueDatas,topic的broker信息brokerDatas,顺序topic配置orderTopicConf,消费过滤信息filterServerTable等属性*/
private TopicRouteData topicRouteData;
2.4 计算发送次数timesTotal
int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 +
this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
计算循环发送消息的总次数timesTotal, 默认情况下, 同步为3, 允许重试2次, 其他模式为1, 即不允许重试。实际上, 异步发送消息可以最多重试2次, 不是这里实现的。是MQClientAPIImpl#sendMessage方法中。
2.5 selectOneMessageQueue选择消息队列
方法内部调用mqFaultStrategy#selectOneMessageQueue方法:
/*** DefaultMQProducerImpl的方法** 选择一个消息队列* @param tpInfo topic信息* @param lastBrokerName 上次使用过的broker*/
public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {//调用mqFaultStrategy#selectOneMessageQueue方法return this.mqFaultStrategy.selectOneMessageQueue(tpInfo, lastBrokerName);
}
mqFaultStrategy#selectOneMessageQueue方法支持故障转移机制:
- 首先判断是否开启了发送延迟故障转移机制, 即判断sendLatencyFaultEnable属性是否为true, 默认为false不开启。
- 如果开启的话, 首先仍然是遍历消息队列, 按照轮询的方式选取一个消息队列, 当消息队列可用时, 选择消息队列的工作就结束, 否则循环选择其他队列。如果该mq的broker不存在LatencyFaultTolerance维护的faultItemTable集合属性中, 或者当前时间戳大于该broker下一次开始可用的时间戳, 则表示无障碍。
- 如果没有选出无故障mq, 那么从LatencyFaultTolerance维护的不是最好的broker集合faultItemTable中随机选择一个broker, 随后判断如果写队列数大于0, 那么选择该Broker。然后遍历消息队列, 采用取模的方式获取一个队列, 重置其brokerName和queueId, 进行消息发送。
- 如果上面的步骤抛出了异常, 那么遍历消息队列, 采用取模的方式获取一个队列。
- 如果没有开启延迟故障转移机制, 那么遍历消息队列, 采用取模轮询的方式获取一个brokerName与lastBrokerName不相等的队列, 即不会再次选择上次发送失败的broker。如果没有找到一个不同broker的mq, 那么退回到轮询的方式。
selectOneMessageQueue方法选择mq的时候的故障转移机制, 目的是为了保证每次消息尽快的发送, 是一种高可用手段。
- 延迟时间的故障转移, 消息队列选择时候, 可用过滤mq认为不可用的broker, 以免不断为宕机的broker发送消息, 选取一个延迟比较短的broker, 实现消息发送高可用。
- 没有开启延迟时间的故障转移的时候, 在轮询选择mq的时候, 不会选择上一次发送失败的broker, 实现消息发送高可用。
/*** MQFaultStrategy的方法* <p>* 选择一个消息队列,支持故障延迟转移** @param tpInfo topic信息* @param lastBrokerName 上次使用过的broker*/
public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {/** 判断是否开启了发送延迟故障转移机制,默认false不打开* 如果开启了该机制,那么每次选取topic下对应的queue时,会基于之前执行的耗时,在有存在符合条件的broker的前提下,优选选取一个延迟较短的broker,否则再考虑随机选取。*/if (this.sendLatencyFaultEnable) {try {//当前线程线程的消息队列的下标,循环选择消息队列使用+1int index = tpInfo.getSendWhichQueue().incrementAndGet();//遍历消息队列,采用取模的方式获取一个队列,即轮询的方式for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {//取模int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();if (pos < 0)pos = 0;//获取该消息队列MessageQueue mq = tpInfo.getMessageQueueList().get(pos);//如果当前消息队列是可用的,即无故障,那么直接返回该mq//如果该broker不存在LatencyFaultTolerance维护的faultItemTable集合属性中,或者当前时间已经大于该broker下一次开始可用的时间点,表示无故障if (latencyFaultTolerance.isAvailable(mq.getBrokerName()))return mq;}//没有选出无故障的mq,那么一个不是最好的broker集合中随机选择一个final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();//如果写队列数大于0,那么选择该brokerint writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);if (writeQueueNums > 0) {//遍历消息队列,采用取模的方式获取一个队列,即轮询的方式final MessageQueue mq = tpInfo.selectOneMessageQueue();if (notBestBroker != null) {//重置其brokerName,queueId,进行消息发送mq.setBrokerName(notBestBroker);mq.setQueueId(tpInfo.getSendWhichQueue().incrementAndGet() % writeQueueNums);}return mq;} else {//如果写队列数小于0,那么移除该brokerlatencyFaultTolerance.remove(notBestBroker);}} catch (Exception e) {log.error("Error occurred when selecting message queue", e);}//如果上面的步骤抛出了异常,那么遍历消息队列,采用取模的方式获取一个队列,即轮询的方式return tpInfo.selectOneMessageQueue();}//如果没有发送延迟故障转移机制,那么那么遍历消息队列,即采用取模轮询的方式//获取一个brokerName与lastBrokerName不相等的队列,即不会再次选择上次发送失败的brokerreturn tpInfo.selectOneMessageQueue(lastBrokerName);
}
selectOneMessageQueue方法有两个重载方法, 一个无参的, 一个有参的。
无参的表示选择mq无限制:
/*** TopicPublishInfo的方法* <p>* 轮询的选择一个mq*/
public MessageQueue selectOneMessageQueue() {//获取下一个indexint index = this.sendWhichQueue.incrementAndGet();//取模计算索引int pos = Math.abs(index) % this.messageQueueList.size();if (pos < 0)pos = 0;//获取该索引的mqreturn this.messageQueueList.get(pos);
}
有参数, 其参数是上一次发送失败的brokerName, 表示不会选择上一次失败的brokerName的mq。如果最后没有选择出来, 那么走轮询的逻辑。
/*** TopicPublishInfo的方法** @param lastBrokerName 上一次发送失败的brokerName*/
public MessageQueue selectOneMessageQueue(final String lastBrokerName) {//如果lastBrokerName为null,即第一次发送,那么轮询选择一个if (lastBrokerName == null) {return selectOneMessageQueue();} else {for (int i = 0; i < this.messageQueueList.size(); i++) {//轮询选择一个mqint index = this.sendWhichQueue.incrementAndGet();int pos = Math.abs(index) % this.messageQueueList.size();if (pos < 0)pos = 0;MessageQueue mq = this.messageQueueList.get(pos);//如果mq的brokerName不等于lastBrokerName,就返回,否则选择下一个if (!mq.getBrokerName().equals(lastBrokerName)) {return mq;}}//没有选出来,那么轮询选择一个return selectOneMessageQueue();}
}
2.6 sendKernelImpl发送消息
选择了队列后, 调用sendKernelImpl发送消息
- 首先调用findBrokerAddressInPublish方法从brokerAddrTable中查找Master broker地址。如果找不到, tryToFindTopicPublishInfo从nameServer远程拉取配置, 并更新本地缓存, 再次尝试获取Master broker地址。
- 调用brokerVIPChannel判断是否开启vip通道, 如果开启了, 那么将brokerAddr的port – 2, vip通道的端口为普通端口-2。
- 如果不是批量消息, 那么设置唯一的uniqId。
- 如果不是批量消息, 且消息体大于4K, 那么压缩消息。
- 如果存在CheckForbiddenHook, 则执行checkForbidden钩子方法。如果存在SendMessageHook, 则执行sendMessageBefore钩子方法。
- 设置请求头信息SendMessageRequestHeader, 请求头包含各种基本属性, producerGroup, topic, queueId, 并且将消息重试次数和最大重试次数存入请求头中。
- 根据不同的发送模式发送消息, 如果是异步, 需要先克隆并还原消息, 最终异步, 同步, 单向都是调用MQClientAPIImpl#sendMessage方法发送消息的。
- 如果MQClientAPIImpl#sendMessage方法正常发送或者抛出RemotingException、MQBrokerException、InterruptedException异常, 判断如果存在SendMessageHook, 执行sendMessageAfter钩子方法。
- finally中对消息进行恢复。
/*** DefaultMQProducerImpl的方法* 发送消息** @param msg 消息* @param mq mq* @param communicationMode 发送模式* @param sendCallback 发送回调* @param topicPublishInfo topic信息* @param timeout 超时时间* @return 发送结果*/
private SendResult sendKernelImpl(final Message msg,final MessageQueue mq,final CommunicationMode communicationMode,final SendCallback sendCallback,final TopicPublishInfo topicPublishInfo,final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {//开始时间long beginStartTime = System.currentTimeMillis();/** 1 根据brokerName从brokerAddrTable中查找broker地址*/String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());//如果本地找不到 broker 的地址if (null == brokerAddr) {/** 2 从nameServer远程拉取配置,并更新本地缓存* 该方法此前就学习过了*/tryToFindTopicPublishInfo(mq.getTopic());//再次获取地址brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());}SendMessageContext context = null;if (brokerAddr != null) {/** 3 vip通道判断*/brokerAddr = MixAll.brokerVIPChannel(this.defaultMQProducer.isSendMessageWithVIPChannel(), brokerAddr);byte[] prevBody = msg.getBody();try {//for MessageBatch,ID has been set in the generating process/** 4 如果不是批量消息,那么尝试生成唯一uniqId,即UNIQ_KEY属性。MessageBatch批量消息在生成时就已经设置uniqId* uniqId也被称为客户端生成的msgId,从逻辑上代表唯一一条消息*/if (!(msg instanceof MessageBatch)) {MessageClientIDSetter.setUniqID(msg);}/** 设置nameSpace为实例Id*/boolean topicWithNamespace = false;if (null != this.mQClientFactory.getClientConfig().getNamespace()) {msg.setInstanceId(this.mQClientFactory.getClientConfig().getNamespace());topicWithNamespace = true;}//消息标识符int sysFlag = 0;//消息压缩标识boolean msgBodyCompressed = false;/** 5 尝试压缩消息*/if (this.tryToCompressMessage(msg)) {sysFlag |= MessageSysFlag.COMPRESSED_FLAG;msgBodyCompressed = true;}//事务消息标志,prepare消息final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);if (Boolean.parseBoolean(tranMsg)) {sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE;}/** 6 如果存在CheckForbiddenHook,则执行checkForbidden方法* 为什么叫禁止钩子呢,可能是想要使用者将不可发送消息的检查放在这个钩子函数里面吧(猜测)*/if (hasCheckForbiddenHook()) {CheckForbiddenContext checkForbiddenContext = new CheckForbiddenContext();checkForbiddenContext.setNameSrvAddr(this.defaultMQProducer.getNamesrvAddr());checkForbiddenContext.setGroup(this.defaultMQProducer.getProducerGroup());checkForbiddenContext.setCommunicationMode(communicationMode);checkForbiddenContext.setBrokerAddr(brokerAddr);checkForbiddenContext.setMessage(msg);checkForbiddenContext.setMq(mq);checkForbiddenContext.setUnitMode(this.isUnitMode());this.executeCheckForbiddenHook(checkForbiddenContext);}/** 7 如果存在SendMessageHook,则执行sendMessageBefore方法*/if (this.hasSendMessageHook()) {context = new SendMessageContext();context.setProducer(this);context.setProducerGroup(this.defaultMQProducer.getProducerGroup());context.setCommunicationMode(communicationMode);context.setBornHost(this.defaultMQProducer.getClientIP());context.setBrokerAddr(brokerAddr);context.setMessage(msg);context.setMq(mq);context.setNamespace(this.defaultMQProducer.getNamespace());String isTrans = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);if (isTrans != null && isTrans.equals("true")) {context.setMsgType(MessageType.Trans_Msg_Half);}if (msg.getProperty("__STARTDELIVERTIME") != null || msg.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL) != null) {context.setMsgType(MessageType.Delay_Msg);}this.executeSendMessageHookBefore(context);}/** 8 设置请求头信息*/SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());requestHeader.setTopic(msg.getTopic());requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey());requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums());requestHeader.setQueueId(mq.getQueueId());requestHeader.setSysFlag(sysFlag);requestHeader.setBornTimestamp(System.currentTimeMillis());requestHeader.setFlag(msg.getFlag());requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));requestHeader.setReconsumeTimes(0);requestHeader.setUnitMode(this.isUnitMode());requestHeader.setBatch(msg instanceof MessageBatch);//针对重试消息的处理if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {//获取消息重新消费次数属性值String reconsumeTimes = MessageAccessor.getReconsumeTime(msg);if (reconsumeTimes != null) {//将重新消费次数设置到请求头中,并且清除该属性requestHeader.setReconsumeTimes(Integer.valueOf(reconsumeTimes));MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_RECONSUME_TIME);}//获取消息的最大重试次数属性值String maxReconsumeTimes = MessageAccessor.getMaxReconsumeTimes(msg);if (maxReconsumeTimes != null) {//将最大重新消费次数设置到请求头中,并且清除该属性requestHeader.setMaxReconsumeTimes(Integer.valueOf(maxReconsumeTimes));MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_MAX_RECONSUME_TIMES);}}/** 9 根据不同的发送模式,发送消息*/SendResult sendResult = null;switch (communicationMode) {/** 异步发送模式*/case ASYNC:/** 首先克隆并还原消息** 该方法的finally中已经有还原消息的代码了,为什么在异步发送消息之前,还要先还原消息呢?** 因为异步发送时 finally 重新赋值的时机并不确定,有很大概率是在第一次发送结束前就完成了 finally 中的赋值,* 因此在内部重试前 msg.body 大概率已经被重新赋值过,而 onExceptionImpl 中的重试逻辑 MQClientAPIImpl.sendMessageAsync 不会再对数据进行压缩,* 简言之,在异步发送的情况下,如果调用 onExceptionImpl 内部的重试,有很大概率发送的是无压缩的数据*/Message tmpMessage = msg;boolean messageCloned = false;//如果开启了消息压缩if (msgBodyCompressed) {//If msg body was compressed, msgbody should be reset using prevBody.//Clone new message using commpressed message body and recover origin massage.//Fix bug:https://github.com/apache/rocketmq-externals/issues/66//克隆一个messagetmpMessage = MessageAccessor.cloneMessage(msg);messageCloned = true;//恢复原来的消息体msg.setBody(prevBody);}//如果topic整合了namespaceif (topicWithNamespace) {if (!messageCloned) {tmpMessage = MessageAccessor.cloneMessage(msg);messageCloned = true;}//还原topicmsg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQProducer.getNamespace()));}/** 发送消息之前,进行超时检查,如果已经超时了那么取消本次发送操作,抛出异常*/long costTimeAsync = System.currentTimeMillis() - beginStartTime;if (timeout < costTimeAsync) {throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");}/** 10 发送异步消息*/sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(brokerAddr,mq.getBrokerName(),tmpMessage,requestHeader,timeout - costTimeAsync,communicationMode,sendCallback,topicPublishInfo,this.mQClientFactory,this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(),context,this);break;/** 单向、同步发送模式*/case ONEWAY:case SYNC:/** 发送消息之前,进行超时检查,如果已经超时了那么取消本次发送操作,抛出异常*/long costTimeSync = System.currentTimeMillis() - beginStartTime;if (timeout < costTimeSync) {throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");}/** 10 发送单向、同步消息*/sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(brokerAddr,mq.getBrokerName(),msg,requestHeader,timeout - costTimeSync,communicationMode,context,this);break;default:assert false;break;}/** 9 如果存在SendMessageHook,则执行sendMessageAfter方法*/if (this.hasSendMessageHook()) {context.setSendResult(sendResult);this.executeSendMessageHookAfter(context);}//返回执行结果return sendResult;//如果抛出了异常,如果存在SendMessageHook,则执行sendMessageAfter方法} catch (RemotingException e) {if (this.hasSendMessageHook()) {context.setException(e);this.executeSendMessageHookAfter(context);}throw e;} catch (MQBrokerException e) {if (this.hasSendMessageHook()) {context.setException(e);this.executeSendMessageHookAfter(context);}throw e;} catch (InterruptedException e) {if (this.hasSendMessageHook()) {context.setException(e);this.executeSendMessageHookAfter(context);}throw e;} finally {/** 对消息进行恢复* 1、因为客户端可能还需要查看原始的消息内容,如果是压缩消息,则无法查看* 2、另外如果第一次压缩后消息还是大于4k,如果不恢复消息,那么客户端使用该message重新发送的时候,还会进行一次消息压缩*/msg.setBody(prevBody);msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQProducer.getNamespace()));}}throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
}
2.6.1 findBrokerAddressInPublish查找broker地址
根据brokerName向brokerAddrTable查找数据, 因为生产者只会向Master发送数据, 所以返回的是Master地址。
/*** MQClientInstance的方法*/
public String findBrokerAddressInPublish(final String brokerName) {//查询brokerAddrTable缓存的数据HashMap<Long/* brokerId */, String/* address */> map = this.brokerAddrTable.get(brokerName);//返回Mater节点的地址if (map != null && !map.isEmpty()) {return map.get(MixAll.MASTER_ID);}return null;
}
2.6.2 brokerVIPChannel判断vip通道
调用brokerVIPChannel判断是否开启vip通道, 如果开启了, 那么将brokerAddr的port – 2, vip通道的端口为普通端口-2。
/*** MixAll的方法*/
public static String brokerVIPChannel(final boolean isChange, final String brokerAddr) {//如果开启了vip通道if (isChange) {int split = brokerAddr.lastIndexOf(":");String ip = brokerAddr.substring(0, split);String port = brokerAddr.substring(split + 1);//重新拼接brokerAddr,其中port - 2String brokerAddrNew = ip + ":" + (Integer.parseInt(port) - 2);return brokerAddrNew;} else {//如果没有开启vip通道,那么返回原地址return brokerAddr;}
}
消费者拉取消息只能请求普通通道, 但是生产者可以选择vip和普通通道。
2.6.3 setUniqID生成uniqId
设置到UNIQ_KEY属性中, 批量消息在生成时就已经设置uniqId。
uniqId表示客户端生成的唯一一条消息。
/*** MessageClientIDSetter的方法*/
public static void setUniqID(final Message msg) {//如果这条消息不存在"UNIQ_KEY"属性,那么创建uniqId并且存入"UNIQ_KEY"属性中if (msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX) == null) {msg.putProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX, createUniqID());}
}
2.6.4 tryToCompressMessage压缩消息
消息压缩, 压缩比为5, 压缩之后, 设置压缩标志, 批量消息不支持压缩, 消息压缩有利于网络传输数据。
/*** DefaultMQProducerImpl的方法*/
private boolean tryToCompressMessage(final Message msg) {//如果是批量消息,那么不进行压缩if (msg instanceof MessageBatch) {//batch dose not support compressing right nowreturn false;}byte[] body = msg.getBody();if (body != null) {//如果消息长度大于4Kif (body.length >= this.defaultMQProducer.getCompressMsgBodyOverHowmuch()) {try {//进行压缩,使用的JDK自带的压缩类byte[] data = UtilAll.compress(body, zipCompressLevel);if (data != null) {//重新设置到body中msg.setBody(data);return true;}} catch (IOException e) {log.error("tryToCompressMessage exception", e);log.warn(msg.toString());}}}return false;
}
2.7 updateFaultItem更新故障表
发送消息后, 无论正常与否, 调用updateFaultItem更新故障表, 更新本地错误表缓存数据, 用于延迟时间的故障转移功能。
故障转移功能在此前的selectOneMessageQueue方法中被使用到, 用于查找一个可用的消息队列, updateFaultItem方法判断是否开启了故障转移功能, 会更新LatencyFaultTolerance维护的faultItemTable集合属性中的异常broker数据。
/*** DefaultMQProducerImpl的方法* @param brokerName brokerName* @param currentLatency 当前延迟* @param isolation 是否使用默认隔离*/
public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) {//调用MQFaultStrategy#updateFaultItem方法this.mqFaultStrategy.updateFaultItem(brokerName, currentLatency, isolation);
}
MQFaultStrategy#updateFaultItem方法。其根据本次发送消息的延迟时间currentLatency, 计算出broker的隔离时间duration, 即是broker的下一个可用时间点。用于更新故障记录表。
/*** MQFaultStrategy的方法** @param brokerName brokerName* @param currentLatency 当前延迟* @param isolation 是否使用默认隔离时间*/
public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) {//如果开启了故障转移,即sendLatencyFaultEnable为true,默认falseif (this.sendLatencyFaultEnable) {//根据消息当前延迟currentLatency计算当前broker的故障延迟的时间duration//如果isolation为true,则使用默认隔离时间30000,即30slong duration = computeNotAvailableDuration(isolation ? 30000 : currentLatency);//更新故障记录表this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration);}
}
2.7.1 computeNotAvailableDuration计算隔离时间
根据消息当前延迟currentLatency计算当前broker的故障延迟的时间duration, 据此即可以计算出该broker的下一个可用时间点。
//延迟等级
private long[] latencyMax = {50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L};
//不可用时间等级
private long[] notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L};/*** MQFaultStrategy的方法** @param currentLatency 当前延迟* @return 故障延迟的时间*/
private long computeNotAvailableDuration(final long currentLatency) {//倒叙遍历latencyMaxfor (int i = latencyMax.length - 1; i >= 0; i--) {//选择broker延迟时间对应的broker不可用时间,默认30000对应的故障延迟的时间为600000,即10分钟if (currentLatency >= latencyMax[i])return this.notAvailableDuration[i];}return 0;
}
latencyMax为延迟等级, notAvailableDuration为隔离时间。之间的关系:
2.7.2 updateFaultItem更新故障表
该方法更新LatencyFaultToleranceImpl维护的faultItemTable集合属性中的异常broker的故障信息, 将会设置发送消息的延迟时间currentLatency属性, 以及下一个可用时间点LatencyFaultToleranceImpl属性。
下次可用时间LatencyFaultToleranceImpl属性= 现在的时间 + 隔离的时间, 在selectOneMessageQueue方法选取消息队列的时候, 如果开启了故障转移, 那么会查找下一个可用时间点小于当前时间点的broker的队列来发送消息。
/*** LatencyFaultToleranceImpl的方法** @param name brokerName* @param currentLatency 当前延迟* @param notAvailableDuration 隔离时间(不可用时间)*/
@Override
public void updateFaultItem(final String name, final long currentLatency, final long notAvailableDuration) {//获取该broker此前的故障记录数据FaultItem old = this.faultItemTable.get(name);//如果此前没有数据,那么设置一个新对象肌凝乳if (null == old) {final FaultItem faultItem = new FaultItem(name);//设置当前延迟faultItem.setCurrentLatency(currentLatency);//设置下一次可用时间点faultItem.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);//已有故障记录,更新old = this.faultItemTable.putIfAbsent(name, faultItem);if (old != null) {old.setCurrentLatency(currentLatency);old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);}} else {//已有故障记录,更新old.setCurrentLatency(currentLatency);old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);}
}
3.总结
-
生产者消息重试: RocketMQ的消费者消息重试和生产者消息重投。
-
生产者故障转移: 通过sendLatencyFaultEnable属性配置是否开启, 目的是为了保证每次发送消息成功, 是一种高可用手段。
- 延迟时间的故障转移, 需要sendLatencyFaultEnable为true, 消息队列选择时候, 可用过滤mq认为不可用的broker, 以免不断为宕机的broker发送消息, 选取一个延迟比较短的broker, 实现消息发送高可用。
- 没有开启延迟时间的故障转移的时候, 在轮询选择mq的时候, 不会选择上一次发送失败的broker, 实现消息发送高可用。
-
Vip通道: VIP通道用于隔离读写操作, 消费者拉取消息只能请求普通通道, 生产者可用选择vip或者普通通道。
-
故障转移表: RocketMQ的Producer生产者故障转移依赖于故障转移表实现, 是一个HasmMap。消息发送结束之后, 会根据本次发送消息的延迟时间currentLatency, 计算该broker的隔离时间duration, 即为broker的下一次可用时间点。然后更新故障记录表。故障转移表的key为brokerName, value为未来该broker可用时间。