asyncSendMessage方法用来处理来自producer发送的消息。
文章目录
- 1.asyncSendMessage异步处理单条消息
- 2.preSend准备响应命令对象
- 2.1 msgCheck检查并自动创建topic
- 2.1.1 createTopicInSendMessageMethod创建普通topic
- 2.1.2 createTopicInSendMessageBackMethod创建重试topc
- 2.1.3 autoCreateTopicEnable自动创建topic的问题
- 3.handlePutMessageResultFuture处理消息存放结果
1.asyncSendMessage异步处理单条消息
- 调用preSend方法创建响应的命令对象, 包括自动创建topic的逻辑, 随后创建响应头对象。
- 随后创建MessageExtBrokerInner对象, 从请求中获取消息的属性并设置到对象属性中, 消息体, topic。
- 判断如果是重试或者死信消息, 调用handleRetryAndDLQ方法处理重试和死信队列消息, 如果已重试次数大于最大重试次数, 那么替换topic为死信队列topic, 消息会被发送至死信队列。
- 判断如果是事务准备消息, 并且不会拒绝处理事务消息, 调用asyncPrepareMessage方法以异步的方式处理存储事务准备消息。
- 如果是普通消息, 调用asyncPutMessage方法处理, 存储普通消息。asyncPutMessage以异步方式将消息存储到存储器中, 处理器可以处理下一个请求而不是等待结, 当结果完成时, 以异步方式通知客户端。
- 调用handlePutMessageResultFuture方法处理消息存储的处理结果。
/*** SendMessageProcessor的方法* <p>* 处理单条消息*/
private CompletableFuture<RemotingCommand> asyncSendMessage(ChannelHandlerContext ctx, RemotingCommand request,SendMessageContext mqtraceContext,SendMessageRequestHeader requestHeader) {/** 1 创建响应的命令对象,包括自动创建topic的逻辑*/final RemotingCommand response = preSend(ctx, request, requestHeader);//获取响应头final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader) response.readCustomHeader();if (response.getCode() != -1) {return CompletableFuture.completedFuture(response);}//获取消息体final byte[] body = request.getBody();//获取队列idint queueIdInt = requestHeader.getQueueId();//从broker的topicConfigTable缓存中根据topicName获取TopicConfigTopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());//如果队列id小于0,则随机选择一个写队列索引作为idif (queueIdInt < 0) {queueIdInt = randomQueueId(topicConfig.getWriteQueueNums());}//构建消息对象,保存着要存入commitLog的数据MessageExtBrokerInner msgInner = new MessageExtBrokerInner();//设置topicmsgInner.setTopic(requestHeader.getTopic());//设置队列idmsgInner.setQueueId(queueIdInt);/** 2 处理重试和死信队列消息,将会对死信消息替换为死信topic*/if (!handleRetryAndDLQ(requestHeader, response, request, msgInner, topicConfig)) {return CompletableFuture.completedFuture(response);}/** 设置一系列属性*/msgInner.setBody(body);msgInner.setFlag(requestHeader.getFlag());Map<String, String> origProps = MessageDecoder.string2messageProperties(requestHeader.getProperties());//设置到properties属性中MessageAccessor.setProperties(msgInner, origProps);msgInner.setBornTimestamp(requestHeader.getBornTimestamp());msgInner.setBornHost(ctx.channel().remoteAddress());msgInner.setStoreHost(this.getStoreHost());msgInner.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes());String clusterName = this.brokerController.getBrokerConfig().getBrokerClusterName();MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_CLUSTER, clusterName);//WAIT属性表示 消息发送时是否等消息存储完成后再返回if (origProps.containsKey(MessageConst.PROPERTY_WAIT_STORE_MSG_OK)) {// There is no need to store "WAIT=true", remove it from propertiesString to save 9 bytes for each message.// It works for most case. In some cases msgInner.setPropertiesString invoked later and replace it.//不需要存储"WAIT=true"属性,从propertiesString中移除它,为每个消息节省9个字节。String waitStoreMsgOKValue = origProps.remove(MessageConst.PROPERTY_WAIT_STORE_MSG_OK);//将没有WAIT属性的origProps存入msgInner的propertiesString属性msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));// Reput to properties, since msgInner.isWaitStoreMsgOK() will be invoked later//将WAIT属性重新存入origProps集合中,因为msgInner.isWaitStoreMsgOK()稍后将被调用origProps.put(MessageConst.PROPERTY_WAIT_STORE_MSG_OK, waitStoreMsgOKValue);} else {//将没有WAIT属性的origProps存入msgInner的propertiesString属性msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));}CompletableFuture<PutMessageResult> putMessageResult = null;/** 处理事务消息逻辑*///TRAN_MSG属性值为true,表示为事务消息String transFlag = origProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);//处理事务消息if (transFlag != null && Boolean.parseBoolean(transFlag)) {//判断是否需要拒绝事务消息,如果需要拒绝,则返回NO_PERMISSION异常if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {response.setCode(ResponseCode.NO_PERMISSION);response.setRemark("the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()+ "] sending transaction message is forbidden");return CompletableFuture.completedFuture(response);}//调用asyncPrepareMessage方法以异步的方式处理、存储事务准备消息,底层仍是asyncPutMessage方法putMessageResult = this.brokerController.getTransactionalMessageService().asyncPrepareMessage(msgInner);} else {//不是事务消息,那么调用asyncPutMessage方法处理,存储消息//以异步方式将消息存储到存储器中,处理器可以处理下一个请求而不是等待结果,当结果完成时,以异步方式通知客户端putMessageResult = this.brokerController.getMessageStore().asyncPutMessage(msgInner);}//处理消息存放的结果return handlePutMessageResultFuture(putMessageResult, response, request, msgInner, responseHeader, mqtraceContext, ctx, queueIdInt);
}
2.preSend准备响应命令对象
- 创建响应的命令对象, 包括topic的校验和自动创建topic的逻辑。
- 创建RemotingCommand对象, 设置唯一请求id。
- 校验 当前时间是否小于broker的起始服务时间, 如果小于, 返回SYSTEM_ERROR, 表示不可用提供服务。
- msgCheck方法进行一系列的校验, 包括topic的自动创建逻辑。
/*** SendMessageProcessor的方法* <p>* 准备响应数据*/
private RemotingCommand preSend(ChannelHandlerContext ctx, RemotingCommand request,SendMessageRequestHeader requestHeader) {//创建响应命令对象final RemotingCommand response = RemotingCommand.createResponseCommand(SendMessageResponseHeader.class);//设置唯一id为请求idresponse.setOpaque(request.getOpaque());//添加扩展字段属性"MSG_REGION"、"TRACE_ON"response.addExtField(MessageConst.PROPERTY_MSG_REGION, this.brokerController.getBrokerConfig().getRegionId());response.addExtField(MessageConst.PROPERTY_TRACE_SWITCH, String.valueOf(this.brokerController.getBrokerConfig().isTraceOn()));log.debug("Receive SendMessage request command {}", request);//获取配置的broker的处理请求的起始服务时间,默认为0final long startTimestamp = this.brokerController.getBrokerConfig().getStartAcceptSendRequestTimeStamp();//如果当前时间小于起始时间,那么broker会返回一个SYSTEM_ERROR,表示现在broker还不能提供服务if (this.brokerController.getMessageStore().now() < startTimestamp) {response.setCode(ResponseCode.SYSTEM_ERROR);response.setRemark(String.format("broker unable to service, until %s", UtilAll.timeMillisToHumanString2(startTimestamp)));return response;}//设置code为-1response.setCode(-1);/** 消息校验,包括自动创建topic的逻辑*/super.msgCheck(ctx, requestHeader, response);if (response.getCode() != -1) {return response;}return response;
}
2.1 msgCheck检查并自动创建topic
-
校验如果当前broker有没有写的权限, 如果没有, 则broker会返回一个NO_PERMISSION异常。
-
校验topic不能为空, 必须属于合法字符, 且长度不超过127字符。
-
校验如果当前topic是不为允许使用的系统topic, 抛出异常。
-
从broker的topicConfigTable缓存中根据topicName获取TopicConfig。
- 如果不存在该topic信息, 调用createTopicInSendMessageMethod创建topic, 失败了, 判断是否重试, 如果是重试创建topic, 如果创建还是失败的话, 返回TOPIC_NOT_EXIST异常信息。
-
如果找到或者创建了topic, 校验queutId 不能大于等于该broker的读或写的最大queueId。
/*** AbstractSendMessageProcessor的方法* <p>* 消息校验,包括自动创建topic的逻辑*/
protected RemotingCommand msgCheck(final ChannelHandlerContext ctx,final SendMessageRequestHeader requestHeader, final RemotingCommand response) {//如果当前broker没有写的权限,那么broker会返回一个NO_PERMISSION异常,sending message is forbidden,禁止向该broker发送消息if (!PermName.isWriteable(this.brokerController.getBrokerConfig().getBrokerPermission())&& this.brokerController.getTopicConfigManager().isOrderTopic(requestHeader.getTopic())) {response.setCode(ResponseCode.NO_PERMISSION);response.setRemark("the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()+ "] sending message is forbidden");return response;}//校验topic不能为空,必须属于合法字符regex: ^[%|a-zA-Z0-9_-]+$,且长度不超过127个字符if (!TopicValidator.validateTopic(requestHeader.getTopic(), response)) {return response;}//校验如果当前topic是不为允许使用的系统topic,那么抛出异常,默认不能为SCHEDULE_TOPIC_XXXXif (TopicValidator.isNotAllowedSendTopic(requestHeader.getTopic(), response)) {return response;}//从broker的topicConfigTable缓存中根据topicName获取TopicConfigTopicConfig topicConfig =this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());//如果不存在该topic信息if (null == topicConfig) {int topicSysFlag = 0;if (requestHeader.isUnitMode()) {if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {topicSysFlag = TopicSysFlag.buildSysFlag(false, true);} else {topicSysFlag = TopicSysFlag.buildSysFlag(true, false);}}log.warn("the topic {} not exist, producer: {}", requestHeader.getTopic(), ctx.channel().remoteAddress());/** 尝试创建普通topic*/topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageMethod(requestHeader.getTopic(),requestHeader.getDefaultTopic(),RemotingHelper.parseChannelRemoteAddr(ctx.channel()),requestHeader.getDefaultTopicQueueNums(), topicSysFlag);/** 尝试创建重试topic*/if (null == topicConfig) {if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {topicConfig =this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(requestHeader.getTopic(), 1, PermName.PERM_WRITE | PermName.PERM_READ,topicSysFlag);}}if (null == topicConfig) {response.setCode(ResponseCode.TOPIC_NOT_EXIST);response.setRemark("topic[" + requestHeader.getTopic() + "] not exist, apply first please!"+ FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL));return response;}}//校验queutId 不能大于等于该broker的读或写的最大数量int queueIdInt = requestHeader.getQueueId();int idValid = Math.max(topicConfig.getWriteQueueNums(), topicConfig.getReadQueueNums());if (queueIdInt >= idValid) {String errorInfo = String.format("request queueId[%d] is illegal, %s Producer: %s",queueIdInt,topicConfig.toString(),RemotingHelper.parseChannelRemoteAddr(ctx.channel()));log.warn(errorInfo);response.setCode(ResponseCode.SYSTEM_ERROR);response.setRemark(errorInfo);return response;}return response;
}
2.1.1 createTopicInSendMessageMethod创建普通topic
- 创建一个新的topic
-
获取锁防止并发创建相同的topic, 获取锁之后, 再次尝试从topicConfigTable获取topic信息, 如果获取到了, 直接返回, 如果没有, 创建topic。
-
获取默认topic的信息, 作为创建新topic的模板, 默认topic实际上就是TBW102, 其有8个读写队列, 权限为读写并且可继承,即7。
-
如果默认topic就是TBW102, 并且如果broker配置不支持自动创建topic, 即autoCreateTopicEnable为false, 设置权限为可读写, 不可继承。
-
如果默认topic配置的权限包括可继承, 从默认topic继承属性创建新topic。创建一个TopicConfig, 选择默认队列数量与默认topic写队列中数量小的值作为新topic的读写队列数量, 默认为4, 设置权限, 去除可继承权限。
-
如果topic不是null, 则表示创建了topic, 将新的topic信息存入topicConfigTable缓存中, 生成下一个数据版本。标识位置为true, 调用persist方法将topic配置持久化到配置文件 {user.home}/store/config/topics.json中。
-
解锁, 判断如果创建了新topic, 调用registerBrokerAll方法向nameServer注册当前broker的新配置路由信息。
/*** TopicConfigManager的方法* <p>* 创建普通topic,并持久化至配置文件 {user.home}/store/config/topics.json中** @param topic 待创建topic* @param defaultTopic 默认topic,用于作为模板创建新topic* @param remoteAddress 远程地址* @param clientDefaultTopicQueueNums 自动创建服务器不存在的topic时,默认创建的队列数,默认为4
* 可通过生产者DefaultMQProducer的defaultTopicQueueNums属性进行配置* @param topicSysFlag topic标识* @return topic配置*/public TopicConfig createTopicInSendMessageMethod(final String topic, final String defaultTopic,final String remoteAddress, final int clientDefaultTopicQueueNums, final int topicSysFlag) {TopicConfig topicConfig = null;boolean createNew = false;try {//需要加锁防止并发创建相同的topicif (this.topicConfigTableLock.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {try {//再次尝试从topicConfigTable获取topic信息,如果获取到了,那么直接返回topicConfig = this.topicConfigTable.get(topic);if (topicConfig != null)return topicConfig;//获取默认topic的信息,用于作为模板创建新topic,默认的默认topic实际上就是TBW102,其有8个读写队列,权限为读写并且可继承,即7TopicConfig defaultTopicConfig = this.topicConfigTable.get(defaultTopic);if (defaultTopicConfig != null) {//如果默认topic就是TBW102if (defaultTopic.equals(TopicValidator.AUTO_CREATE_TOPIC_KEY_TOPIC)) {//如果broker配置不支持自动创建topic,那么设置权限为可读写,不可继承,即6if (!this.brokerController.getBrokerConfig().isAutoCreateTopicEnable()) {defaultTopicConfig.setPerm(PermName.PERM_READ | PermName.PERM_WRITE);}}//如果默认topic配置的权限包括可继承,那么从默认topic继承属性if (PermName.isInherited(defaultTopicConfig.getPerm())) {//创建topic配置topicConfig = new TopicConfig(topic);//选择默认队列数量与默认topic写队列数中最小的值作为新topic的读写队列数量,默认为4int queueNums = Math.min(clientDefaultTopicQueueNums, defaultTopicConfig.getWriteQueueNums());if (queueNums < 0) {queueNums = 0;}topicConfig.setReadQueueNums(queueNums);topicConfig.setWriteQueueNums(queueNums);//权限int perm = defaultTopicConfig.getPerm();//去掉可继承权限perm &= ~PermName.PERM_INHERIT;topicConfig.setPerm(perm);topicConfig.setTopicSysFlag(topicSysFlag);topicConfig.setTopicFilterType(defaultTopicConfig.getTopicFilterType());} else {log.warn("Create new topic failed, because the default topic[{}] has no perm [{}] producer:[{}]",defaultTopic, defaultTopicConfig.getPerm(), remoteAddress);}} else {log.warn("Create new topic failed, because the default topic[{}] not exist. producer:[{}]",defaultTopic, remoteAddress);}//如果topic不为null,说明创建了新topicif (topicConfig != null) {log.info("Create new topic by default topic:[{}] config:[{}] producer:[{}]",defaultTopic, topicConfig, remoteAddress);//将新的topic信息存入topicConfigTable缓存中this.topicConfigTable.put(topic, topicConfig);//生成下一个数据版本this.dataVersion.nextVersion();//标识位置为truecreateNew = true;/** 将topic配置持久化到配置文件 {user.home}/store/config/topics.json中*/this.persist();}} finally {//解锁this.topicConfigTableLock.unlock();}}} catch (InterruptedException e) {log.error("createTopicInSendMessageMethod exception", e);}//如果创建了新topic,那么马上向nameServer注册当前broker的新配置路由信息if (createNew) {this.brokerController.registerBrokerAll(false, true, true);}return topicConfig;}
2.1.2 createTopicInSendMessageBackMethod创建重试topc
- 自动创建重试topic, 源码和创建普通topic类似,. 不同的是重试topic不需要模板topic, 默认读写队列都是1, 权限为读写。
/*** TopicConfigManager的方法* <p>* 创建重试topic,并持久化至配置文件 {user.home}/store/config/topics.json中** @param topic 待创建topic* @param perm 权限* @param clientDefaultTopicQueueNums 自动创建服务器不存在的topic时,默认创建的队列数,默认为4* 可通过生产者DefaultMQProducer的defaultTopicQueueNums属性进行配置* @param topicSysFlag topic标识* @return topic配置*/
public TopicConfig createTopicInSendMessageBackMethod(final String topic,final int clientDefaultTopicQueueNums,final int perm,final int topicSysFlag) {//尝试获取topicTopicConfig topicConfig = this.topicConfigTable.get(topic);//如果存在则直接返回if (topicConfig != null)return topicConfig;boolean createNew = false;try {//需要加锁防止并发创建相同的topicif (this.topicConfigTableLock.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {try {//再次尝试从topicConfigTable获取topic信息,如果获取到了,那么直接返回topicConfig = this.topicConfigTable.get(topic);if (topicConfig != null)return topicConfig;//创建topictopicConfig = new TopicConfig(topic);//重试topic的默认读写队列数量为1topicConfig.setReadQueueNums(clientDefaultTopicQueueNums);topicConfig.setWriteQueueNums(clientDefaultTopicQueueNums);//重试topic的默认权限为读写topicConfig.setPerm(perm);topicConfig.setTopicSysFlag(topicSysFlag);log.info("create new topic {}", topicConfig);this.topicConfigTable.put(topic, topicConfig);createNew = true;//获取下一个版本this.dataVersion.nextVersion();//持久化broker信息this.persist();} finally {//解锁this.topicConfigTableLock.unlock();}}} catch (InterruptedException e) {log.error("createTopicInSendMessageBackMethod exception", e);}if (createNew) {//注册broker信息this.brokerController.registerBrokerAll(false, true, true);}return topicConfig;
}
2.1.3 autoCreateTopicEnable自动创建topic的问题
-
Producer发送消息源码, 客户端发送信息前, 会选择一个topic所在的broker地址, 如果topic不存在, 那么会默认topic的路由信息中的一个broker发送。
-
信息发送到broker后, 会发送没有指定的topic并且如果broker的autoCreateTopicEnable为true的话, 会走createTopicInSendMessageMethod方法, 会自动创建topic方法的最后马上调用registerBrokerAll方法向nameServer注册当前broker的新配置路由信息。
-
生产者客户端会定时每30s从nameServer更新路由数据, 如果此时有其他的producer的存在, 并且刚好从nameServer获取到了这个新的topic的路由信息, 假设其他producer也需要向该topic发送信息, 由于发现topic路由信息已存在, 并且只存在于刚才那一个broker中, 此时这些producer都会将该topic的消息发送到这一个broker中来。
-
接下来所有的Producer都只会向这一个Broker发送消息, 其他broker不再有机会创建新topic。本想topic在所有broker创建, 但是只有一个broker有topic信息, 违背了RocketMQ集群实现高可用的本事。
-
所以RocketMQ官方建议生产环境下将broker的autoCreateTopicEnable设置为false, 闭自动创建topic, 改为手动在每个broker上创建。
3.handlePutMessageResultFuture处理消息存放结果
存放消息后, 调用handlePutMessageResult方法。
/*** SendMessageProcessor的方法* <p>* 处理消息存放结果** @param putMessageResult 存放结果* @param response 响应对象* @param request 请求对象* @param msgInner 内部消息对象* @param responseHeader 响应头* @param sendMessageContext 发送消息上下文* @param ctx 连接上下文* @param queueIdInt queueId* @return*/
private CompletableFuture<RemotingCommand> handlePutMessageResultFuture(CompletableFuture<PutMessageResult> putMessageResult,RemotingCommand response,RemotingCommand request,MessageExt msgInner,SendMessageResponseHeader responseHeader,SendMessageContext sendMessageContext,ChannelHandlerContext ctx,int queueIdInt) {//阻塞,当从存放消息完毕时,执行后续的操作,即执行handlePutMessageResult方法return putMessageResult.thenApply((r) ->handlePutMessageResult(r, response, request, msgInner, responseHeader, sendMessageContext, ctx, queueIdInt));
}
handlePutMessageResult处理存放消息的结果, 响应写回给客户端。
/*** SendMessageProcessor的方法* <p>* 处理存放消息的结果** @param putMessageResult 存放结果* @param response 响应对象* @param request 请求对象* @param msg 内部消息对象* @param responseHeader 响应头* @param sendMessageContext 发送消息上下文* @param ctx 连接上下文* @param queueIdInt queueId* @return*/
private RemotingCommand handlePutMessageResult(PutMessageResult putMessageResult, RemotingCommand response,RemotingCommand request, MessageExt msg,SendMessageResponseHeader responseHeader, SendMessageContext sendMessageContext, ChannelHandlerContext ctx,int queueIdInt) {//结果为null,那么直接返回系统异常if (putMessageResult == null) {response.setCode(ResponseCode.SYSTEM_ERROR);response.setRemark("store putMessage return null");return response;}boolean sendOK = false;//解析存放消息状态码,转换为对应的响应码switch (putMessageResult.getPutMessageStatus()) {// Successcase PUT_OK:sendOK = true;response.setCode(ResponseCode.SUCCESS);break;case FLUSH_DISK_TIMEOUT:response.setCode(ResponseCode.FLUSH_DISK_TIMEOUT);sendOK = true;break;case FLUSH_SLAVE_TIMEOUT:response.setCode(ResponseCode.FLUSH_SLAVE_TIMEOUT);sendOK = true;break;case SLAVE_NOT_AVAILABLE:response.setCode(ResponseCode.SLAVE_NOT_AVAILABLE);sendOK = true;break;// Failedcase CREATE_MAPEDFILE_FAILED:response.setCode(ResponseCode.SYSTEM_ERROR);response.setRemark("create mapped file failed, server is busy or broken.");break;case MESSAGE_ILLEGAL:case PROPERTIES_SIZE_EXCEEDED:response.setCode(ResponseCode.MESSAGE_ILLEGAL);response.setRemark("the message is illegal, maybe msg body or properties length not matched. msg body length limit 128k, msg properties length limit 32k.");break;case SERVICE_NOT_AVAILABLE:response.setCode(ResponseCode.SERVICE_NOT_AVAILABLE);response.setRemark("service not available now. It may be caused by one of the following reasons: " +"the broker's disk is full [" + diskUtil() + "], messages are put to the slave, message store has been shut down, etc.");break;case OS_PAGECACHE_BUSY:response.setCode(ResponseCode.SYSTEM_ERROR);response.setRemark("[PC_SYNCHRONIZED]broker busy, start flow control for a while");break;case LMQ_CONSUME_QUEUE_NUM_EXCEEDED:response.setCode(ResponseCode.SYSTEM_ERROR);response.setRemark("[LMQ_CONSUME_QUEUE_NUM_EXCEEDED]broker config enableLmq and enableMultiDispatch, lmq consumeQueue num exceed maxLmqConsumeQueueNum config num, default limit 2w.");break;case UNKNOWN_ERROR:response.setCode(ResponseCode.SYSTEM_ERROR);response.setRemark("UNKNOWN_ERROR");break;default:response.setCode(ResponseCode.SYSTEM_ERROR);response.setRemark("UNKNOWN_ERROR DEFAULT");break;}String owner = request.getExtFields().get(BrokerStatsManager.COMMERCIAL_OWNER);//如果发送成功if (sendOK) {//如果topic是SCHEDULE_TOPIC_XXXX,即延迟消息的topicif (TopicValidator.RMQ_SYS_SCHEDULE_TOPIC.equals(msg.getTopic())) {//增加统计计数this.brokerController.getBrokerStatsManager().incQueuePutNums(msg.getTopic(), msg.getQueueId(), putMessageResult.getAppendMessageResult().getMsgNum(), 1);this.brokerController.getBrokerStatsManager().incQueuePutSize(msg.getTopic(), msg.getQueueId(), putMessageResult.getAppendMessageResult().getWroteBytes());}//增加统计计数this.brokerController.getBrokerStatsManager().incTopicPutNums(msg.getTopic(), putMessageResult.getAppendMessageResult().getMsgNum(), 1);this.brokerController.getBrokerStatsManager().incTopicPutSize(msg.getTopic(),putMessageResult.getAppendMessageResult().getWroteBytes());this.brokerController.getBrokerStatsManager().incBrokerPutNums(putMessageResult.getAppendMessageResult().getMsgNum());response.setRemark(null);//设置响应头中的migId,实际上就是broker生成的offsetMsgId属性responseHeader.setMsgId(putMessageResult.getAppendMessageResult().getMsgId());//消息队列IdresponseHeader.setQueueId(queueIdInt);//消息逻辑偏移量responseHeader.setQueueOffset(putMessageResult.getAppendMessageResult().getLogicsOffset());//如果不是单向请求,那么将响应写会客户端doResponse(ctx, request, response);//如果有发送消息的钩子,那么执行if (hasSendMessageHook()) {sendMessageContext.setMsgId(responseHeader.getMsgId());sendMessageContext.setQueueId(responseHeader.getQueueId());sendMessageContext.setQueueOffset(responseHeader.getQueueOffset());int commercialBaseCount = brokerController.getBrokerConfig().getCommercialBaseCount();int wroteSize = putMessageResult.getAppendMessageResult().getWroteBytes();int incValue = (int) Math.ceil(wroteSize / BrokerStatsManager.SIZE_PER_COUNT) * commercialBaseCount;sendMessageContext.setCommercialSendStats(BrokerStatsManager.StatsType.SEND_SUCCESS);sendMessageContext.setCommercialSendTimes(incValue);sendMessageContext.setCommercialSendSize(wroteSize);sendMessageContext.setCommercialOwner(owner);}return null;} else {//如果有发送消息的钩子,那么执行if (hasSendMessageHook()) {int wroteSize = request.getBody().length;int incValue = (int) Math.ceil(wroteSize / BrokerStatsManager.SIZE_PER_COUNT);sendMessageContext.setCommercialSendStats(BrokerStatsManager.StatsType.SEND_FAILURE);sendMessageContext.setCommercialSendTimes(incValue);sendMessageContext.setCommercialSendSize(wroteSize);sendMessageContext.setCommercialOwner(owner);}}return response;
}