[RocketMQ] Broker asyncSendMessage处理消息以及自动创建Topic (十)

asyncSendMessage方法用来处理来自producer发送的消息。

在这里插入图片描述

文章目录

      • 1.asyncSendMessage异步处理单条消息
      • 2.preSend准备响应命令对象
        • 2.1 msgCheck检查并自动创建topic
          • 2.1.1 createTopicInSendMessageMethod创建普通topic
          • 2.1.2 createTopicInSendMessageBackMethod创建重试topc
          • 2.1.3 autoCreateTopicEnable自动创建topic的问题
      • 3.handlePutMessageResultFuture处理消息存放结果

1.asyncSendMessage异步处理单条消息

  1. 调用preSend方法创建响应的命令对象, 包括自动创建topic的逻辑, 随后创建响应头对象。
  2. 随后创建MessageExtBrokerInner对象, 从请求中获取消息的属性并设置到对象属性中, 消息体, topic。
  3. 判断如果是重试或者死信消息, 调用handleRetryAndDLQ方法处理重试和死信队列消息, 如果已重试次数大于最大重试次数, 那么替换topic为死信队列topic, 消息会被发送至死信队列。
  4. 判断如果是事务准备消息, 并且不会拒绝处理事务消息, 调用asyncPrepareMessage方法以异步的方式处理存储事务准备消息。
  5. 如果是普通消息, 调用asyncPutMessage方法处理, 存储普通消息。asyncPutMessage以异步方式将消息存储到存储器中, 处理器可以处理下一个请求而不是等待结, 当结果完成时, 以异步方式通知客户端。
  6. 调用handlePutMessageResultFuture方法处理消息存储的处理结果。

在这里插入图片描述

/*** SendMessageProcessor的方法* <p>* 处理单条消息*/
private CompletableFuture<RemotingCommand> asyncSendMessage(ChannelHandlerContext ctx, RemotingCommand request,SendMessageContext mqtraceContext,SendMessageRequestHeader requestHeader) {/** 1 创建响应的命令对象,包括自动创建topic的逻辑*/final RemotingCommand response = preSend(ctx, request, requestHeader);//获取响应头final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader) response.readCustomHeader();if (response.getCode() != -1) {return CompletableFuture.completedFuture(response);}//获取消息体final byte[] body = request.getBody();//获取队列idint queueIdInt = requestHeader.getQueueId();//从broker的topicConfigTable缓存中根据topicName获取TopicConfigTopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());//如果队列id小于0,则随机选择一个写队列索引作为idif (queueIdInt < 0) {queueIdInt = randomQueueId(topicConfig.getWriteQueueNums());}//构建消息对象,保存着要存入commitLog的数据MessageExtBrokerInner msgInner = new MessageExtBrokerInner();//设置topicmsgInner.setTopic(requestHeader.getTopic());//设置队列idmsgInner.setQueueId(queueIdInt);/** 2 处理重试和死信队列消息,将会对死信消息替换为死信topic*/if (!handleRetryAndDLQ(requestHeader, response, request, msgInner, topicConfig)) {return CompletableFuture.completedFuture(response);}/** 设置一系列属性*/msgInner.setBody(body);msgInner.setFlag(requestHeader.getFlag());Map<String, String> origProps = MessageDecoder.string2messageProperties(requestHeader.getProperties());//设置到properties属性中MessageAccessor.setProperties(msgInner, origProps);msgInner.setBornTimestamp(requestHeader.getBornTimestamp());msgInner.setBornHost(ctx.channel().remoteAddress());msgInner.setStoreHost(this.getStoreHost());msgInner.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes());String clusterName = this.brokerController.getBrokerConfig().getBrokerClusterName();MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_CLUSTER, clusterName);//WAIT属性表示 消息发送时是否等消息存储完成后再返回if (origProps.containsKey(MessageConst.PROPERTY_WAIT_STORE_MSG_OK)) {// There is no need to store "WAIT=true", remove it from propertiesString to save 9 bytes for each message.// It works for most case. In some cases msgInner.setPropertiesString invoked later and replace it.//不需要存储"WAIT=true"属性,从propertiesString中移除它,为每个消息节省9个字节。String waitStoreMsgOKValue = origProps.remove(MessageConst.PROPERTY_WAIT_STORE_MSG_OK);//将没有WAIT属性的origProps存入msgInner的propertiesString属性msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));// Reput to properties, since msgInner.isWaitStoreMsgOK() will be invoked later//将WAIT属性重新存入origProps集合中,因为msgInner.isWaitStoreMsgOK()稍后将被调用origProps.put(MessageConst.PROPERTY_WAIT_STORE_MSG_OK, waitStoreMsgOKValue);} else {//将没有WAIT属性的origProps存入msgInner的propertiesString属性msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));}CompletableFuture<PutMessageResult> putMessageResult = null;/** 处理事务消息逻辑*///TRAN_MSG属性值为true,表示为事务消息String transFlag = origProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);//处理事务消息if (transFlag != null && Boolean.parseBoolean(transFlag)) {//判断是否需要拒绝事务消息,如果需要拒绝,则返回NO_PERMISSION异常if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {response.setCode(ResponseCode.NO_PERMISSION);response.setRemark("the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()+ "] sending transaction message is forbidden");return CompletableFuture.completedFuture(response);}//调用asyncPrepareMessage方法以异步的方式处理、存储事务准备消息,底层仍是asyncPutMessage方法putMessageResult = this.brokerController.getTransactionalMessageService().asyncPrepareMessage(msgInner);} else {//不是事务消息,那么调用asyncPutMessage方法处理,存储消息//以异步方式将消息存储到存储器中,处理器可以处理下一个请求而不是等待结果,当结果完成时,以异步方式通知客户端putMessageResult = this.brokerController.getMessageStore().asyncPutMessage(msgInner);}//处理消息存放的结果return handlePutMessageResultFuture(putMessageResult, response, request, msgInner, responseHeader, mqtraceContext, ctx, queueIdInt);
}

2.preSend准备响应命令对象

  • 创建响应的命令对象, 包括topic的校验和自动创建topic的逻辑。
  1. 创建RemotingCommand对象, 设置唯一请求id。
  2. 校验 当前时间是否小于broker的起始服务时间, 如果小于, 返回SYSTEM_ERROR, 表示不可用提供服务。
  3. msgCheck方法进行一系列的校验, 包括topic的自动创建逻辑。

在这里插入图片描述

/*** SendMessageProcessor的方法* <p>* 准备响应数据*/
private RemotingCommand preSend(ChannelHandlerContext ctx, RemotingCommand request,SendMessageRequestHeader requestHeader) {//创建响应命令对象final RemotingCommand response = RemotingCommand.createResponseCommand(SendMessageResponseHeader.class);//设置唯一id为请求idresponse.setOpaque(request.getOpaque());//添加扩展字段属性"MSG_REGION"、"TRACE_ON"response.addExtField(MessageConst.PROPERTY_MSG_REGION, this.brokerController.getBrokerConfig().getRegionId());response.addExtField(MessageConst.PROPERTY_TRACE_SWITCH, String.valueOf(this.brokerController.getBrokerConfig().isTraceOn()));log.debug("Receive SendMessage request command {}", request);//获取配置的broker的处理请求的起始服务时间,默认为0final long startTimestamp = this.brokerController.getBrokerConfig().getStartAcceptSendRequestTimeStamp();//如果当前时间小于起始时间,那么broker会返回一个SYSTEM_ERROR,表示现在broker还不能提供服务if (this.brokerController.getMessageStore().now() < startTimestamp) {response.setCode(ResponseCode.SYSTEM_ERROR);response.setRemark(String.format("broker unable to service, until %s", UtilAll.timeMillisToHumanString2(startTimestamp)));return response;}//设置code为-1response.setCode(-1);/** 消息校验,包括自动创建topic的逻辑*/super.msgCheck(ctx, requestHeader, response);if (response.getCode() != -1) {return response;}return response;
}

2.1 msgCheck检查并自动创建topic

  1. 校验如果当前broker有没有写的权限, 如果没有, 则broker会返回一个NO_PERMISSION异常。
    在这里插入图片描述

  2. 校验topic不能为空, 必须属于合法字符, 且长度不超过127字符。
    在这里插入图片描述在这里插入图片描述

  3. 校验如果当前topic是不为允许使用的系统topic, 抛出异常。
    在这里插入图片描述
    在这里插入图片描述
    在这里插入图片描述
    在这里插入图片描述

  4. 从broker的topicConfigTable缓存中根据topicName获取TopicConfig。
    在这里插入图片描述

    1. 如果不存在该topic信息, 调用createTopicInSendMessageMethod创建topic, 失败了, 判断是否重试, 如果是重试创建topic, 如果创建还是失败的话, 返回TOPIC_NOT_EXIST异常信息。
  5. 如果找到或者创建了topic, 校验queutId 不能大于等于该broker的读或写的最大queueId。
    在这里插入图片描述

/*** AbstractSendMessageProcessor的方法* <p>* 消息校验,包括自动创建topic的逻辑*/
protected RemotingCommand msgCheck(final ChannelHandlerContext ctx,final SendMessageRequestHeader requestHeader, final RemotingCommand response) {//如果当前broker没有写的权限,那么broker会返回一个NO_PERMISSION异常,sending message is forbidden,禁止向该broker发送消息if (!PermName.isWriteable(this.brokerController.getBrokerConfig().getBrokerPermission())&& this.brokerController.getTopicConfigManager().isOrderTopic(requestHeader.getTopic())) {response.setCode(ResponseCode.NO_PERMISSION);response.setRemark("the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()+ "] sending message is forbidden");return response;}//校验topic不能为空,必须属于合法字符regex: ^[%|a-zA-Z0-9_-]+$,且长度不超过127个字符if (!TopicValidator.validateTopic(requestHeader.getTopic(), response)) {return response;}//校验如果当前topic是不为允许使用的系统topic,那么抛出异常,默认不能为SCHEDULE_TOPIC_XXXXif (TopicValidator.isNotAllowedSendTopic(requestHeader.getTopic(), response)) {return response;}//从broker的topicConfigTable缓存中根据topicName获取TopicConfigTopicConfig topicConfig =this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());//如果不存在该topic信息if (null == topicConfig) {int topicSysFlag = 0;if (requestHeader.isUnitMode()) {if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {topicSysFlag = TopicSysFlag.buildSysFlag(false, true);} else {topicSysFlag = TopicSysFlag.buildSysFlag(true, false);}}log.warn("the topic {} not exist, producer: {}", requestHeader.getTopic(), ctx.channel().remoteAddress());/** 尝试创建普通topic*/topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageMethod(requestHeader.getTopic(),requestHeader.getDefaultTopic(),RemotingHelper.parseChannelRemoteAddr(ctx.channel()),requestHeader.getDefaultTopicQueueNums(), topicSysFlag);/** 尝试创建重试topic*/if (null == topicConfig) {if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {topicConfig =this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(requestHeader.getTopic(), 1, PermName.PERM_WRITE | PermName.PERM_READ,topicSysFlag);}}if (null == topicConfig) {response.setCode(ResponseCode.TOPIC_NOT_EXIST);response.setRemark("topic[" + requestHeader.getTopic() + "] not exist, apply first please!"+ FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL));return response;}}//校验queutId 不能大于等于该broker的读或写的最大数量int queueIdInt = requestHeader.getQueueId();int idValid = Math.max(topicConfig.getWriteQueueNums(), topicConfig.getReadQueueNums());if (queueIdInt >= idValid) {String errorInfo = String.format("request queueId[%d] is illegal, %s Producer: %s",queueIdInt,topicConfig.toString(),RemotingHelper.parseChannelRemoteAddr(ctx.channel()));log.warn(errorInfo);response.setCode(ResponseCode.SYSTEM_ERROR);response.setRemark(errorInfo);return response;}return response;
}
2.1.1 createTopicInSendMessageMethod创建普通topic

在这里插入图片描述

  • 创建一个新的topic
  1. 获取锁防止并发创建相同的topic, 获取锁之后, 再次尝试从topicConfigTable获取topic信息, 如果获取到了, 直接返回, 如果没有, 创建topic。
    在这里插入图片描述

  2. 获取默认topic的信息, 作为创建新topic的模板, 默认topic实际上就是TBW102, 其有8个读写队列, 权限为读写并且可继承,即7。
    在这里插入图片描述

  3. 如果默认topic就是TBW102, 并且如果broker配置不支持自动创建topic, 即autoCreateTopicEnable为false, 设置权限为可读写, 不可继承。
    在这里插入图片描述
    在这里插入图片描述

  4. 如果默认topic配置的权限包括可继承, 从默认topic继承属性创建新topic。创建一个TopicConfig, 选择默认队列数量与默认topic写队列中数量小的值作为新topic的读写队列数量, 默认为4, 设置权限, 去除可继承权限。
    在这里插入图片描述

  5. 如果topic不是null, 则表示创建了topic, 将新的topic信息存入topicConfigTable缓存中, 生成下一个数据版本。标识位置为true, 调用persist方法将topic配置持久化到配置文件 {user.home}/store/config/topics.json中。
    在这里插入图片描述

  6. 解锁, 判断如果创建了新topic, 调用registerBrokerAll方法向nameServer注册当前broker的新配置路由信息。
    在这里插入图片描述

  /*** TopicConfigManager的方法* <p>* 创建普通topic,并持久化至配置文件 {user.home}/store/config/topics.json中** @param topic                       待创建topic* @param defaultTopic                默认topic,用于作为模板创建新topic* @param remoteAddress               远程地址* @param clientDefaultTopicQueueNums 自动创建服务器不存在的topic时,默认创建的队列数,默认为4
*                                    可通过生产者DefaultMQProducer的defaultTopicQueueNums属性进行配置* @param topicSysFlag                topic标识* @return topic配置*/public TopicConfig createTopicInSendMessageMethod(final String topic, final String defaultTopic,final String remoteAddress, final int clientDefaultTopicQueueNums, final int topicSysFlag) {TopicConfig topicConfig = null;boolean createNew = false;try {//需要加锁防止并发创建相同的topicif (this.topicConfigTableLock.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {try {//再次尝试从topicConfigTable获取topic信息,如果获取到了,那么直接返回topicConfig = this.topicConfigTable.get(topic);if (topicConfig != null)return topicConfig;//获取默认topic的信息,用于作为模板创建新topic,默认的默认topic实际上就是TBW102,其有8个读写队列,权限为读写并且可继承,即7TopicConfig defaultTopicConfig = this.topicConfigTable.get(defaultTopic);if (defaultTopicConfig != null) {//如果默认topic就是TBW102if (defaultTopic.equals(TopicValidator.AUTO_CREATE_TOPIC_KEY_TOPIC)) {//如果broker配置不支持自动创建topic,那么设置权限为可读写,不可继承,即6if (!this.brokerController.getBrokerConfig().isAutoCreateTopicEnable()) {defaultTopicConfig.setPerm(PermName.PERM_READ | PermName.PERM_WRITE);}}//如果默认topic配置的权限包括可继承,那么从默认topic继承属性if (PermName.isInherited(defaultTopicConfig.getPerm())) {//创建topic配置topicConfig = new TopicConfig(topic);//选择默认队列数量与默认topic写队列数中最小的值作为新topic的读写队列数量,默认为4int queueNums = Math.min(clientDefaultTopicQueueNums, defaultTopicConfig.getWriteQueueNums());if (queueNums < 0) {queueNums = 0;}topicConfig.setReadQueueNums(queueNums);topicConfig.setWriteQueueNums(queueNums);//权限int perm = defaultTopicConfig.getPerm();//去掉可继承权限perm &= ~PermName.PERM_INHERIT;topicConfig.setPerm(perm);topicConfig.setTopicSysFlag(topicSysFlag);topicConfig.setTopicFilterType(defaultTopicConfig.getTopicFilterType());} else {log.warn("Create new topic failed, because the default topic[{}] has no perm [{}] producer:[{}]",defaultTopic, defaultTopicConfig.getPerm(), remoteAddress);}} else {log.warn("Create new topic failed, because the default topic[{}] not exist. producer:[{}]",defaultTopic, remoteAddress);}//如果topic不为null,说明创建了新topicif (topicConfig != null) {log.info("Create new topic by default topic:[{}] config:[{}] producer:[{}]",defaultTopic, topicConfig, remoteAddress);//将新的topic信息存入topicConfigTable缓存中this.topicConfigTable.put(topic, topicConfig);//生成下一个数据版本this.dataVersion.nextVersion();//标识位置为truecreateNew = true;/** 将topic配置持久化到配置文件 {user.home}/store/config/topics.json中*/this.persist();}} finally {//解锁this.topicConfigTableLock.unlock();}}} catch (InterruptedException e) {log.error("createTopicInSendMessageMethod exception", e);}//如果创建了新topic,那么马上向nameServer注册当前broker的新配置路由信息if (createNew) {this.brokerController.registerBrokerAll(false, true, true);}return topicConfig;}
2.1.2 createTopicInSendMessageBackMethod创建重试topc
  • 自动创建重试topic, 源码和创建普通topic类似,. 不同的是重试topic不需要模板topic, 默认读写队列都是1, 权限为读写。
/*** TopicConfigManager的方法* <p>* 创建重试topic,并持久化至配置文件 {user.home}/store/config/topics.json中** @param topic                       待创建topic* @param perm                        权限* @param clientDefaultTopicQueueNums 自动创建服务器不存在的topic时,默认创建的队列数,默认为4*                                    可通过生产者DefaultMQProducer的defaultTopicQueueNums属性进行配置* @param topicSysFlag                topic标识* @return topic配置*/
public TopicConfig createTopicInSendMessageBackMethod(final String topic,final int clientDefaultTopicQueueNums,final int perm,final int topicSysFlag) {//尝试获取topicTopicConfig topicConfig = this.topicConfigTable.get(topic);//如果存在则直接返回if (topicConfig != null)return topicConfig;boolean createNew = false;try {//需要加锁防止并发创建相同的topicif (this.topicConfigTableLock.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {try {//再次尝试从topicConfigTable获取topic信息,如果获取到了,那么直接返回topicConfig = this.topicConfigTable.get(topic);if (topicConfig != null)return topicConfig;//创建topictopicConfig = new TopicConfig(topic);//重试topic的默认读写队列数量为1topicConfig.setReadQueueNums(clientDefaultTopicQueueNums);topicConfig.setWriteQueueNums(clientDefaultTopicQueueNums);//重试topic的默认权限为读写topicConfig.setPerm(perm);topicConfig.setTopicSysFlag(topicSysFlag);log.info("create new topic {}", topicConfig);this.topicConfigTable.put(topic, topicConfig);createNew = true;//获取下一个版本this.dataVersion.nextVersion();//持久化broker信息this.persist();} finally {//解锁this.topicConfigTableLock.unlock();}}} catch (InterruptedException e) {log.error("createTopicInSendMessageBackMethod exception", e);}if (createNew) {//注册broker信息this.brokerController.registerBrokerAll(false, true, true);}return topicConfig;
}
2.1.3 autoCreateTopicEnable自动创建topic的问题

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

  • Producer发送消息源码, 客户端发送信息前, 会选择一个topic所在的broker地址, 如果topic不存在, 那么会默认topic的路由信息中的一个broker发送。

  • 信息发送到broker后, 会发送没有指定的topic并且如果broker的autoCreateTopicEnable为true的话, 会走createTopicInSendMessageMethod方法, 会自动创建topic方法的最后马上调用registerBrokerAll方法向nameServer注册当前broker的新配置路由信息。

  • 生产者客户端会定时每30s从nameServer更新路由数据, 如果此时有其他的producer的存在, 并且刚好从nameServer获取到了这个新的topic的路由信息, 假设其他producer也需要向该topic发送信息, 由于发现topic路由信息已存在, 并且只存在于刚才那一个broker中, 此时这些producer都会将该topic的消息发送到这一个broker中来。

  • 接下来所有的Producer都只会向这一个Broker发送消息, 其他broker不再有机会创建新topic。本想topic在所有broker创建, 但是只有一个broker有topic信息, 违背了RocketMQ集群实现高可用的本事。

  • 所以RocketMQ官方建议生产环境下将broker的autoCreateTopicEnable设置为false, 闭自动创建topic, 改为手动在每个broker上创建。

3.handlePutMessageResultFuture处理消息存放结果

存放消息后, 调用handlePutMessageResult方法。

在这里插入图片描述

/*** SendMessageProcessor的方法* <p>* 处理消息存放结果** @param putMessageResult   存放结果* @param response           响应对象* @param request            请求对象* @param msgInner           内部消息对象* @param responseHeader     响应头* @param sendMessageContext 发送消息上下文* @param ctx                连接上下文* @param queueIdInt         queueId* @return*/
private CompletableFuture<RemotingCommand> handlePutMessageResultFuture(CompletableFuture<PutMessageResult> putMessageResult,RemotingCommand response,RemotingCommand request,MessageExt msgInner,SendMessageResponseHeader responseHeader,SendMessageContext sendMessageContext,ChannelHandlerContext ctx,int queueIdInt) {//阻塞,当从存放消息完毕时,执行后续的操作,即执行handlePutMessageResult方法return putMessageResult.thenApply((r) ->handlePutMessageResult(r, response, request, msgInner, responseHeader, sendMessageContext, ctx, queueIdInt));
}

handlePutMessageResult处理存放消息的结果, 响应写回给客户端。

/*** SendMessageProcessor的方法* <p>* 处理存放消息的结果** @param putMessageResult   存放结果* @param response           响应对象* @param request            请求对象* @param msg                内部消息对象* @param responseHeader     响应头* @param sendMessageContext 发送消息上下文* @param ctx                连接上下文* @param queueIdInt         queueId* @return*/
private RemotingCommand handlePutMessageResult(PutMessageResult putMessageResult, RemotingCommand response,RemotingCommand request, MessageExt msg,SendMessageResponseHeader responseHeader, SendMessageContext sendMessageContext, ChannelHandlerContext ctx,int queueIdInt) {//结果为null,那么直接返回系统异常if (putMessageResult == null) {response.setCode(ResponseCode.SYSTEM_ERROR);response.setRemark("store putMessage return null");return response;}boolean sendOK = false;//解析存放消息状态码,转换为对应的响应码switch (putMessageResult.getPutMessageStatus()) {// Successcase PUT_OK:sendOK = true;response.setCode(ResponseCode.SUCCESS);break;case FLUSH_DISK_TIMEOUT:response.setCode(ResponseCode.FLUSH_DISK_TIMEOUT);sendOK = true;break;case FLUSH_SLAVE_TIMEOUT:response.setCode(ResponseCode.FLUSH_SLAVE_TIMEOUT);sendOK = true;break;case SLAVE_NOT_AVAILABLE:response.setCode(ResponseCode.SLAVE_NOT_AVAILABLE);sendOK = true;break;// Failedcase CREATE_MAPEDFILE_FAILED:response.setCode(ResponseCode.SYSTEM_ERROR);response.setRemark("create mapped file failed, server is busy or broken.");break;case MESSAGE_ILLEGAL:case PROPERTIES_SIZE_EXCEEDED:response.setCode(ResponseCode.MESSAGE_ILLEGAL);response.setRemark("the message is illegal, maybe msg body or properties length not matched. msg body length limit 128k, msg properties length limit 32k.");break;case SERVICE_NOT_AVAILABLE:response.setCode(ResponseCode.SERVICE_NOT_AVAILABLE);response.setRemark("service not available now. It may be caused by one of the following reasons: " +"the broker's disk is full [" + diskUtil() + "], messages are put to the slave, message store has been shut down, etc.");break;case OS_PAGECACHE_BUSY:response.setCode(ResponseCode.SYSTEM_ERROR);response.setRemark("[PC_SYNCHRONIZED]broker busy, start flow control for a while");break;case LMQ_CONSUME_QUEUE_NUM_EXCEEDED:response.setCode(ResponseCode.SYSTEM_ERROR);response.setRemark("[LMQ_CONSUME_QUEUE_NUM_EXCEEDED]broker config enableLmq and enableMultiDispatch, lmq consumeQueue num exceed maxLmqConsumeQueueNum config num, default limit 2w.");break;case UNKNOWN_ERROR:response.setCode(ResponseCode.SYSTEM_ERROR);response.setRemark("UNKNOWN_ERROR");break;default:response.setCode(ResponseCode.SYSTEM_ERROR);response.setRemark("UNKNOWN_ERROR DEFAULT");break;}String owner = request.getExtFields().get(BrokerStatsManager.COMMERCIAL_OWNER);//如果发送成功if (sendOK) {//如果topic是SCHEDULE_TOPIC_XXXX,即延迟消息的topicif (TopicValidator.RMQ_SYS_SCHEDULE_TOPIC.equals(msg.getTopic())) {//增加统计计数this.brokerController.getBrokerStatsManager().incQueuePutNums(msg.getTopic(), msg.getQueueId(), putMessageResult.getAppendMessageResult().getMsgNum(), 1);this.brokerController.getBrokerStatsManager().incQueuePutSize(msg.getTopic(), msg.getQueueId(), putMessageResult.getAppendMessageResult().getWroteBytes());}//增加统计计数this.brokerController.getBrokerStatsManager().incTopicPutNums(msg.getTopic(), putMessageResult.getAppendMessageResult().getMsgNum(), 1);this.brokerController.getBrokerStatsManager().incTopicPutSize(msg.getTopic(),putMessageResult.getAppendMessageResult().getWroteBytes());this.brokerController.getBrokerStatsManager().incBrokerPutNums(putMessageResult.getAppendMessageResult().getMsgNum());response.setRemark(null);//设置响应头中的migId,实际上就是broker生成的offsetMsgId属性responseHeader.setMsgId(putMessageResult.getAppendMessageResult().getMsgId());//消息队列IdresponseHeader.setQueueId(queueIdInt);//消息逻辑偏移量responseHeader.setQueueOffset(putMessageResult.getAppendMessageResult().getLogicsOffset());//如果不是单向请求,那么将响应写会客户端doResponse(ctx, request, response);//如果有发送消息的钩子,那么执行if (hasSendMessageHook()) {sendMessageContext.setMsgId(responseHeader.getMsgId());sendMessageContext.setQueueId(responseHeader.getQueueId());sendMessageContext.setQueueOffset(responseHeader.getQueueOffset());int commercialBaseCount = brokerController.getBrokerConfig().getCommercialBaseCount();int wroteSize = putMessageResult.getAppendMessageResult().getWroteBytes();int incValue = (int) Math.ceil(wroteSize / BrokerStatsManager.SIZE_PER_COUNT) * commercialBaseCount;sendMessageContext.setCommercialSendStats(BrokerStatsManager.StatsType.SEND_SUCCESS);sendMessageContext.setCommercialSendTimes(incValue);sendMessageContext.setCommercialSendSize(wroteSize);sendMessageContext.setCommercialOwner(owner);}return null;} else {//如果有发送消息的钩子,那么执行if (hasSendMessageHook()) {int wroteSize = request.getBody().length;int incValue = (int) Math.ceil(wroteSize / BrokerStatsManager.SIZE_PER_COUNT);sendMessageContext.setCommercialSendStats(BrokerStatsManager.StatsType.SEND_FAILURE);sendMessageContext.setCommercialSendTimes(incValue);sendMessageContext.setCommercialSendSize(wroteSize);sendMessageContext.setCommercialOwner(owner);}}return response;
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/7172.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

TCP三次和四次握手:

内容来自思学堂&#xff1a; TCP三次握手&#xff1a;确保双方都在线上 TCP四次握手&#xff1a;处理客户端要断开连接的需求

React修改Antd组件的样式

修改默认的antd组件&#xff0c;需要使用global import React, { useState, useEffect } from react; import { Tabs, Rate, Steps } from antd; import styles from ./index.less;const Index (props) >{return (<div className{styles.class_steps}><Stepsprog…

没有BuildConfig

Android Gradle 插件8.0.0&#xff08;2023年4月&#xff09; 刚刚发现&#xff0c;新创建的一个Android项目&#xff0c;成功运行到手机上了&#xff0c;然后在代码中想使用一下BuildConfig这个类&#xff0c;发现没有&#xff0c;按以前的经验&#xff0c;项目刚创建时Build…

电子企业MES管理系统如何加强生产现场管控

随着数字化时代的到来&#xff0c;中小型电子企业面临着提升生产现场管控的挑战。为了应对这一挑战&#xff0c;许多企业开始采用MES生产管理系统。本文将探讨MES生产管理系统如何帮助电子企业加强生产现场管控&#xff0c;提高生产效率和质量。 在电子企业的生产现场&#xf…

【零基础入门学习Python---Python中Web开发基础之快速入门实践】

&#x1f680; 零基础入门学习Python&#x1f680; &#x1f332; 算法刷题专栏 | 面试必备算法 | 面试高频算法 &#x1f340; &#x1f332; 越难的东西,越要努力坚持&#xff0c;因为它具有很高的价值&#xff0c;算法就是这样✨ &#x1f332; 作者简介&#xff1a;硕风和炜…

相对位置编码(relative position representation)

最近在看wenet项目时&#xff0c;发现其用的是相对位置编码。同时在做tts时&#xff0c;发现其效果还可以&#xff0c;但是就是对于长文本的生成效果不好&#xff0c;一直在思考是什么原因导致的&#xff0c;有想到最有可能是fastspeech是的绝对位置编码问题&#xff0c;所以还…

计算机网络

计算机网络 学习路径规划Cisco Packet TracerCisco Packet Tracer下载和安装探究集线器性质访问Web服务器&#xff08;加深对网络体系结构的理解&#xff09;_一台主机访问一个Web服务器&#xff0c;HTTP协议 MAC帧交换机网桥网桥和交换机的区别 学习路径规划 用cisco的课程和…

CV多模态和AIGC的原理解析:从CLIP、BLIP到Stable Diffusion、Midjourney

前言 终于开写本CV多模态系列的核心主题&#xff1a;stable diffusion相关的了&#xff0c;为何执着于想写这个stable diffusion呢&#xff0c;源于三点 去年stable diffusion和midjourney很火的时候&#xff0c;就想写&#xff0c;因为经常被刷屏&#xff0c;但那会时间错不…

OpenCV(图像处理)-基于python-滤波器(低通、高通滤波器的使用方法)

1.概念介绍2. 图像卷积filter2D() 3. 低通滤波器3.1 方盒滤波和均值滤波boxFilter()blur() 3.2 高斯滤波&#xff08;高斯噪音&#xff09;3.3 中值滤波&#xff08;胡椒噪音&#xff09;3.4 双边滤波 4. 高通滤波器4.1Sobel&#xff08;索贝尔&#xff09;&#xff08;高斯&am…

查询Oracle当前用户下,所有数据表的总条数

1. 需求 查询Oracle当前用户下&#xff0c;所有数据表的总条数 2.方法 存储过程 3. 操作 3.1 新建 右键点击Procedures&#xff0c;点击New 点击OK 把存储过程写进去&#xff0c;然后点击编译运行&#xff1a; create or replace procedure tables_count ist_count n…

uni-App踩坑记录

​ 1、uni自己封装的axios在真机中失效&#xff0c;发不出请求 uniapp中使用axios 需要配置适配器 (添加适配器有点费劲&#xff0c;直接封装uni自带请求也可以) axios-adapter-uniapp传送门 axios.defaults.adapter function(config) { //自己定义个适配器&#xff0c;用来…

WIN11系统安装MySql8.0.15详细安装

一.下载mysql8.015数据库 下载地址&#xff1a; 如下图所示 此处下载的是8.0.15版本&#xff0c;免安装版&#xff0c;系统为64位系统&#xff1a; 二&#xff0c;配置mysql环境变量: D:\program_file_worker\mysql8.15\mysql-8.0.15-winx64\bin 三. 环境配置完成后&#xff…