RocketMQ源码 发送消息源码分析

前言

DefaultMQProducer 是默认生产者组件,是生产者客户端中,绝大部分关于生产者和broker、nameSrv进行网络通信的功能入口。其中,包含发送各种形式(同步、异步、事务、顺序)的消息,针对发送消息部分的实现,主要是封装好消息协议后,从NameSrv获取当前Topic路由信息,轮询悬着一个队列,通过RemotingClient 客户端,将请求发送给messageQueue所在的broker的主节点。

broker主节点收到消息后,SendMessageProcessor 发送消息处理组件处理请求,将消息写入 DledgerCommitLog组件。在Dledger模式下,由Dledger 服务节点完成CommitLog 内存映射文件的写入和 同步消息到同一个broker组的其他高可用节点。 

同时broker节点的DefaultMessageStore 消息存储组件中,存在一个后台线程,定时查询CommitLog 组件中日志,进行消息重分,如:将消息分发到消费队列 CommitLogDispatcherBuildConsumeQueue 和 构建索引文件  CommitLogDispatcherBuildIndex 。

源码版本:4.9.3

源码架构图

源码分析

发送消息代码入口

org.apache.rocketmq.client.producer.DefaultMQProducer#send(org.apache.rocketmq.common.message.Message)

    /*** Send message in synchronous mode. This method returns only when the sending procedure totally completes. </p>** <strong>Warn:</strong> this method has internal retry-mechanism, that is, internal implementation will retry* {@link #retryTimesWhenSendFailed} times before claiming failure. As a result, multiple messages may potentially* delivered to broker(s). It's up to the application developers to resolve potential duplication issue.** @param msg Message to send.* @return {@link SendResult} instance to inform senders details of the deliverable, say Message ID of the message,* {@link SendStatus} indicating broker storage/replication status, message queue sent to, etc.* @throws MQClientException if there is any client error.* @throws RemotingException if there is any network-tier error.* @throws MQBrokerException if there is any error with broker.* @throws InterruptedException if the sending thread is interrupted.*/// 发送消息入口,同步等待结果@Overridepublic SendResult send(Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {msg.setTopic(withNamespace(msg.getTopic()));return this.defaultMQProducerImpl.send(msg);}

发送消息核心实现函数

代码调用较多,可以参考源码结构图,只看调用部分。下面一步一步贴出来调用链路。

    // 发送消息private SendResult sendDefaultImpl(Message msg, // 消息协议final CommunicationMode communicationMode, // 通信模式:同步、异步final SendCallback sendCallback, // 发送回调final long timeout // 超时时间) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {// 校验参数, 确保服务状态OKthis.makeSureStateOK();// 检查消息Validators.checkMessage(msg, this.defaultMQProducer);final long invokeID = random.nextLong();long beginTimestampFirst = System.currentTimeMillis();long beginTimestampPrev = beginTimestampFirst;long endTimestamp = beginTimestampFirst;// 尝试获取topic发布路由信息TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());if (topicPublishInfo != null && topicPublishInfo.ok()) {boolean callTimeout = false;MessageQueue mq = null;Exception exception = null;SendResult sendResult = null;// 同步发送,重试1次,异步发送,重试2次int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;int times = 0;String[] brokersSent = new String[timesTotal];// 消息发送重试for (; times < timesTotal; times++) {String lastBrokerName = null == mq ? null : mq.getBrokerName();// 选择一个消息队列,轮询模式MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);if (mqSelected != null) {mq = mqSelected;brokersSent[times] = mq.getBrokerName();try {beginTimestampPrev = System.currentTimeMillis();if (times > 0) {//Reset topic with namespace during resend.msg.setTopic(this.defaultMQProducer.withNamespace(msg.getTopic()));}// 处理时间超时直接结束long costTime = beginTimestampPrev - beginTimestampFirst;if (timeout < costTime) {callTimeout = true;break;}// 重要 发送消息核心实现函数sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);endTimestamp = System.currentTimeMillis();this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);switch (communicationMode) {case ASYNC:return null;case ONEWAY:return null;// 同步发送,处理发送结果case SYNC:if (sendResult.getSendStatus() != SendStatus.SEND_OK) {if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {continue;}}return sendResult;default:break;}} catch (RemotingException e) {endTimestamp = System.currentTimeMillis();// 异常处理,更新故障项this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);log.warn(msg.toString());exception = e;continue;} catch (MQClientException e) {endTimestamp = System.currentTimeMillis();// 异常处理,更新故障项this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);log.warn(msg.toString());exception = e;continue;} catch (MQBrokerException e) {endTimestamp = System.currentTimeMillis();// 异常处理,更新故障项this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);log.warn(msg.toString());exception = e;// 如果broker返回的不是重试的响应码,则抛出异常,是的话,继续重试if (this.defaultMQProducer.getRetryResponseCodes().contains(e.getResponseCode())) {continue;} else {if (sendResult != null) {return sendResult;}throw e;}} catch (InterruptedException e) {endTimestamp = System.currentTimeMillis();// 异常处理,更新故障项this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);log.warn(String.format("sendKernelImpl exception, throw exception, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);log.warn(msg.toString());log.warn("sendKernelImpl exception", e);log.warn(msg.toString());throw e;}} else {break;}}if (sendResult != null) {return sendResult;}String info = String.format("Send [%d] times, still failed, cost [%d]ms, Topic: %s, BrokersSent: %s",times,System.currentTimeMillis() - beginTimestampFirst,msg.getTopic(),Arrays.toString(brokersSent));info += FAQUrl.suggestTodo(FAQUrl.SEND_MSG_FAILED);MQClientException mqClientException = new MQClientException(info, exception);if (callTimeout) {throw new RemotingTooMuchRequestException("sendDefaultImpl call timeout");}if (exception instanceof MQBrokerException) {mqClientException.setResponseCode(((MQBrokerException) exception).getResponseCode());} else if (exception instanceof RemotingConnectException) {mqClientException.setResponseCode(ClientErrorCode.CONNECT_BROKER_EXCEPTION);} else if (exception instanceof RemotingTimeoutException) {mqClientException.setResponseCode(ClientErrorCode.ACCESS_BROKER_TIMEOUT);} else if (exception instanceof MQClientException) {mqClientException.setResponseCode(ClientErrorCode.BROKER_NOT_EXIST_EXCEPTION);}throw mqClientException;}validateNameServerSetting();throw new MQClientException("No route info of this topic: " + msg.getTopic() + FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO),null).setResponseCode(ClientErrorCode.NOT_FOUND_TOPIC_EXCEPTION);}
// 发送消息核心实现private SendResult sendKernelImpl(final Message msg,final MessageQueue mq,final CommunicationMode communicationMode,final SendCallback sendCallback,final TopicPublishInfo topicPublishInfo,final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {long beginStartTime = System.currentTimeMillis();// 从已发布的broker集群中选择 master brokerString brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());if (null == brokerAddr) {// 如果不存在broker分组,则尝试从nameserver获取tryToFindTopicPublishInfo(mq.getTopic());brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());}SendMessageContext context = null;if (brokerAddr != null) {brokerAddr = MixAll.brokerVIPChannel(this.defaultMQProducer.isSendMessageWithVIPChannel(), brokerAddr);byte[] prevBody = msg.getBody();try {//for MessageBatch,ID has been set in the generating processif (!(msg instanceof MessageBatch)) {// 非批量消息,设置唯一IDMessageClientIDSetter.setUniqID(msg);}boolean topicWithNamespace = false;if (null != this.mQClientFactory.getClientConfig().getNamespace()) {msg.setInstanceId(this.mQClientFactory.getClientConfig().getNamespace());topicWithNamespace = true;}int sysFlag = 0;boolean msgBodyCompressed = false;if (this.tryToCompressMessage(msg)) {sysFlag |= MessageSysFlag.COMPRESSED_FLAG;msgBodyCompressed = true;}final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);if (Boolean.parseBoolean(tranMsg)) {sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE;}// 检查消息是否被禁止if (hasCheckForbiddenHook()) {CheckForbiddenContext checkForbiddenContext = new CheckForbiddenContext();checkForbiddenContext.setNameSrvAddr(this.defaultMQProducer.getNamesrvAddr());checkForbiddenContext.setGroup(this.defaultMQProducer.getProducerGroup());checkForbiddenContext.setCommunicationMode(communicationMode);checkForbiddenContext.setBrokerAddr(brokerAddr);checkForbiddenContext.setMessage(msg);checkForbiddenContext.setMq(mq);checkForbiddenContext.setUnitMode(this.isUnitMode());this.executeCheckForbiddenHook(checkForbiddenContext);}// 执行发送消息钩子if (this.hasSendMessageHook()) {context = new SendMessageContext();context.setProducer(this);context.setProducerGroup(this.defaultMQProducer.getProducerGroup());context.setCommunicationMode(communicationMode);context.setBornHost(this.defaultMQProducer.getClientIP());context.setBrokerAddr(brokerAddr);context.setMessage(msg);context.setMq(mq);context.setNamespace(this.defaultMQProducer.getNamespace());String isTrans = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);if (isTrans != null && isTrans.equals("true")) {context.setMsgType(MessageType.Trans_Msg_Half);}if (msg.getProperty("__STARTDELIVERTIME") != null || msg.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL) != null) {context.setMsgType(MessageType.Delay_Msg);}this.executeSendMessageHookBefore(context);}// 构建请求头SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());requestHeader.setTopic(msg.getTopic());requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey());requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums());requestHeader.setQueueId(mq.getQueueId());requestHeader.setSysFlag(sysFlag);requestHeader.setBornTimestamp(System.currentTimeMillis());requestHeader.setFlag(msg.getFlag());requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));requestHeader.setReconsumeTimes(0);requestHeader.setUnitMode(this.isUnitMode());requestHeader.setBatch(msg instanceof MessageBatch);if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {String reconsumeTimes = MessageAccessor.getReconsumeTime(msg);if (reconsumeTimes != null) {requestHeader.setReconsumeTimes(Integer.valueOf(reconsumeTimes));MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_RECONSUME_TIME);}String maxReconsumeTimes = MessageAccessor.getMaxReconsumeTimes(msg);if (maxReconsumeTimes != null) {requestHeader.setMaxReconsumeTimes(Integer.valueOf(maxReconsumeTimes));MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_MAX_RECONSUME_TIMES);}}SendResult sendResult = null;switch (communicationMode) {case ASYNC:Message tmpMessage = msg;boolean messageCloned = false;if (msgBodyCompressed) {//If msg body was compressed, msgbody should be reset using prevBody.//Clone new message using commpressed message body and recover origin massage.//Fix bug:https://github.com/apache/rocketmq-externals/issues/66tmpMessage = MessageAccessor.cloneMessage(msg);messageCloned = true;msg.setBody(prevBody);}if (topicWithNamespace) {if (!messageCloned) {tmpMessage = MessageAccessor.cloneMessage(msg);messageCloned = true;}msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQProducer.getNamespace()));}long costTimeAsync = System.currentTimeMillis() - beginStartTime;if (timeout < costTimeAsync) {throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");}sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(brokerAddr,mq.getBrokerName(),tmpMessage,requestHeader,timeout - costTimeAsync,communicationMode,sendCallback,topicPublishInfo,this.mQClientFactory,this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(),context,this);break;case ONEWAY:case SYNC:// 执行前判断以下是否已经超时long costTimeSync = System.currentTimeMillis() - beginStartTime;if (timeout < costTimeSync) {throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");}sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(brokerAddr,mq.getBrokerName(),msg,requestHeader,timeout - costTimeSync,communicationMode,context,this);break;default:assert false;break;}if (this.hasSendMessageHook()) {// 上下文中设置发送结果context.setSendResult(sendResult);// 执行发送消息后的hookthis.executeSendMessageHookAfter(context);}return sendResult;} catch (RemotingException e) {if (this.hasSendMessageHook()) {context.setException(e);this.executeSendMessageHookAfter(context);}throw e;} catch (MQBrokerException e) {if (this.hasSendMessageHook()) {context.setException(e);this.executeSendMessageHookAfter(context);}throw e;} catch (InterruptedException e) {if (this.hasSendMessageHook()) {context.setException(e);this.executeSendMessageHookAfter(context);}throw e;} finally {msg.setBody(prevBody);msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQProducer.getNamespace()));}}throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);}

MQClientAPIImpl.sendMessage()

   // 发送消息public SendResult sendMessage(final String addr, // broker地址final String brokerName, // broker分组名称final Message msg, // 消息final SendMessageRequestHeader requestHeader,// 请求头final long timeoutMillis, // 超时时间final CommunicationMode communicationMode, // 通信模式,同步、异步、onewayfinal SendCallback sendCallback, // 发送回调final TopicPublishInfo topicPublishInfo, // TopicPublishInfofinal MQClientInstance instance, // 客户端实例final int retryTimesWhenSendFailed, // 发送失败重试次数final SendMessageContext context, // 发送上下文final DefaultMQProducerImpl producer // 发送者实例) throws RemotingException, MQBrokerException, InterruptedException {long beginStartTime = System.currentTimeMillis();RemotingCommand request = null;String msgType = msg.getProperty(MessageConst.PROPERTY_MESSAGE_TYPE);boolean isReply = msgType != null && msgType.equals(MixAll.REPLY_MESSAGE_FLAG);// 如果是回复消息,则使用特殊的请求命令if (isReply) {if (sendSmartMsg) {SendMessageRequestHeaderV2 requestHeaderV2 = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(requestHeader);request = RemotingCommand.createRequestCommand(RequestCode.SEND_REPLY_MESSAGE_V2, requestHeaderV2);} else {request = RemotingCommand.createRequestCommand(RequestCode.SEND_REPLY_MESSAGE, requestHeader);}} else {if (sendSmartMsg || msg instanceof MessageBatch) {SendMessageRequestHeaderV2 requestHeaderV2 = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(requestHeader);request = RemotingCommand.createRequestCommand(msg instanceof MessageBatch ? RequestCode.SEND_BATCH_MESSAGE : RequestCode.SEND_MESSAGE_V2, requestHeaderV2);} else {// 重要 发送消息请求code SEND_MESSAGErequest = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, requestHeader);}}request.setBody(msg.getBody());// 根据通信模式进行处理,选择对应的请求方式switch (communicationMode) {case ONEWAY:this.remotingClient.invokeOneway(addr, request, timeoutMillis);return null;case ASYNC:final AtomicInteger times = new AtomicInteger();long costTimeAsync = System.currentTimeMillis() - beginStartTime;if (timeoutMillis < costTimeAsync) {throw new RemotingTooMuchRequestException("sendMessage call timeout");}this.sendMessageAsync(addr, brokerName, msg, timeoutMillis - costTimeAsync, request, sendCallback, topicPublishInfo, instance,retryTimesWhenSendFailed, times, context, producer);return null;case SYNC:long costTimeSync = System.currentTimeMillis() - beginStartTime;if (timeoutMillis < costTimeSync) {throw new RemotingTooMuchRequestException("sendMessage call timeout");}return this.sendMessageSync(addr, brokerName, msg, timeoutMillis - costTimeSync, request);default:assert false;break;}return null;}

调用Netty网络通信客户端组件 remotingClient.invokeSync(),同步发送消息给broker节点。

    // 同步发送消息private SendResult sendMessageSync(final String addr,final String brokerName,final Message msg,final long timeoutMillis,final RemotingCommand request) throws RemotingException, MQBrokerException, InterruptedException {// 调用netty网络通信客户端发送同步消息RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);assert response != null;return this.processSendResponse(brokerName, msg, response, addr);}

broker 发送消息处理器组件入口

// 发送消息处理器
public class SendMessageProcessor extends AbstractSendMessageProcessor implements NettyRequestProcessor {private List<ConsumeMessageHook> consumeMessageHookList;public SendMessageProcessor(final BrokerController brokerController) {super(brokerController);}@Overridepublic RemotingCommand processRequest(ChannelHandlerContext ctx,RemotingCommand request) throws RemotingCommandException {RemotingCommand response = null;try {// 异步处理请求response = asyncProcessRequest(ctx, request).get();} catch (InterruptedException | ExecutionException e) {log.error("process SendMessage error, request : " + request.toString(), e);}return response;}
}

处理 异步发送消息请求

    public CompletableFuture<RemotingCommand> asyncProcessRequest(ChannelHandlerContext ctx,RemotingCommand request) throws RemotingCommandException {final SendMessageContext mqtraceContext;switch (request.getCode()) {case RequestCode.CONSUMER_SEND_MSG_BACK:return this.asyncConsumerSendMsgBack(ctx, request);default:// 同步解析请求头SendMessageRequestHeader requestHeader = parseRequestHeader(request);if (requestHeader == null) {return CompletableFuture.completedFuture(null);}// 构建消息上下文mqtraceContext = buildMsgContext(ctx, requestHeader);// 执行发送前钩子this.executeSendMessageHookBefore(ctx, request, mqtraceContext);if (requestHeader.isBatch()) {return this.asyncSendBatchMessage(ctx, request, mqtraceContext, requestHeader);} else {// 异步执行发送消息return this.asyncSendMessage(ctx, request, mqtraceContext, requestHeader);}}}

调用消息存储组件,异步写入DefaultMessageStore存储组件。

private CompletableFuture<RemotingCommand> asyncSendMessage(ChannelHandlerContext ctx, RemotingCommand request,SendMessageContext mqtraceContext,SendMessageRequestHeader requestHeader) {// 预处理封装响应命令final RemotingCommand response = preSend(ctx, request, requestHeader);// 读取响应头final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader)response.readCustomHeader();if (response.getCode() != -1) {return CompletableFuture.completedFuture(response);}// 提取请求bodyfinal byte[] body = request.getBody();int queueIdInt = requestHeader.getQueueId();// 查询topic 元数据TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());// 如果没有指定队列,则随机选择一个队列if (queueIdInt < 0) {queueIdInt = randomQueueId(topicConfig.getWriteQueueNums());}// 封装成broker内部消息MessageExtBrokerInner msgInner = new MessageExtBrokerInner();msgInner.setTopic(requestHeader.getTopic());msgInner.setQueueId(queueIdInt);if (!handleRetryAndDLQ(requestHeader, response, request, msgInner, topicConfig)) {return CompletableFuture.completedFuture(response);}msgInner.setBody(body);msgInner.setFlag(requestHeader.getFlag());Map<String, String> origProps = MessageDecoder.string2messageProperties(requestHeader.getProperties());MessageAccessor.setProperties(msgInner, origProps);msgInner.setBornTimestamp(requestHeader.getBornTimestamp());msgInner.setBornHost(ctx.channel().remoteAddress());msgInner.setStoreHost(this.getStoreHost());msgInner.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes());String clusterName = this.brokerController.getBrokerConfig().getBrokerClusterName();MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_CLUSTER, clusterName);if (origProps.containsKey(MessageConst.PROPERTY_WAIT_STORE_MSG_OK)) {// There is no need to store "WAIT=true", remove it from propertiesString to save 9 bytes for each message.// It works for most case. In some cases msgInner.setPropertiesString invoked later and replace it.String waitStoreMsgOKValue = origProps.remove(MessageConst.PROPERTY_WAIT_STORE_MSG_OK);msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));// Reput to properties, since msgInner.isWaitStoreMsgOK() will be invoked laterorigProps.put(MessageConst.PROPERTY_WAIT_STORE_MSG_OK, waitStoreMsgOKValue);} else {msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));}CompletableFuture<PutMessageResult> putMessageResult = null;String transFlag = origProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);if (transFlag != null && Boolean.parseBoolean(transFlag)) {if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {response.setCode(ResponseCode.NO_PERMISSION);response.setRemark("the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()+ "] sending transaction message is forbidden");return CompletableFuture.completedFuture(response);}// 事务消息putMessageResult = this.brokerController.getTransactionalMessageService().asyncPrepareMessage(msgInner);} else {// 普通消息,调用消息存储组件,异步写消息putMessageResult = this.brokerController.getMessageStore().asyncPutMessage(msgInner);}return handlePutMessageResultFuture(putMessageResult, response, request, msgInner, responseHeader, mqtraceContext, ctx, queueIdInt);}

消息写入DLedgerCommitLog 

org.apache.rocketmq.store.dledger.DLedgerCommitLog#asyncPutMessage

    // 异步写入消息@Overridepublic CompletableFuture<PutMessageResult> asyncPutMessage(MessageExtBrokerInner msg) {// 存储统计服务StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();// 事务类型final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());setMessageInfo(msg, tranType);final String finalTopic = msg.getTopic();// Back to ResultsAppendMessageResult appendResult;AppendFuture<AppendEntryResponse> dledgerFuture;EncodeResult encodeResult;// 消息进行编码encodeResult = this.messageSerializer.serialize(msg);if (encodeResult.status != AppendMessageStatus.PUT_OK) {return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, new AppendMessageResult(encodeResult.status)));}// 写入锁定,可重入锁putMessageLock.lock(); //spin or ReentrantLock ,depending on store configlong elapsedTimeInLock;long queueOffset;try {beginTimeInDledgerLock = this.defaultMessageStore.getSystemClock().now();queueOffset = getQueueOffsetByKey(encodeResult.queueOffsetKey, tranType);encodeResult.setQueueOffsetKey(queueOffset, false);// 封装追加数据项请求AppendEntryRequest request = new AppendEntryRequest();request.setGroup(dLedgerConfig.getGroup());request.setRemoteId(dLedgerServer.getMemberState().getSelfId());request.setBody(encodeResult.getData());// 追加消息到高可用服务节点dledgerFuture = (AppendFuture<AppendEntryResponse>) dLedgerServer.handleAppend(request);if (dledgerFuture.getPos() == -1) {return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY, new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR)));}long wroteOffset = dledgerFuture.getPos() + DLedgerEntry.BODY_OFFSET;int msgIdLength = (msg.getSysFlag() & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0 ? 4 + 4 + 8 : 16 + 4 + 8;ByteBuffer buffer = ByteBuffer.allocate(msgIdLength);String msgId = MessageDecoder.createMessageId(buffer, msg.getStoreHostBytes(), wroteOffset);elapsedTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginTimeInDledgerLock;appendResult = new AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, encodeResult.getData().length, msgId, System.currentTimeMillis(), queueOffset, elapsedTimeInLock);switch (tranType) {case MessageSysFlag.TRANSACTION_PREPARED_TYPE:case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:break;case MessageSysFlag.TRANSACTION_NOT_TYPE:case MessageSysFlag.TRANSACTION_COMMIT_TYPE:// The next update ConsumeQueue informationDLedgerCommitLog.this.topicQueueTable.put(encodeResult.queueOffsetKey, queueOffset + 1);break;default:break;}} catch (Exception e) {log.error("Put message error", e);return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR)));} finally {beginTimeInDledgerLock = 0;putMessageLock.unlock();}if (elapsedTimeInLock > 500) {log.warn("[NOTIFYME]putMessage in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}", elapsedTimeInLock, msg.getBody().length, appendResult);}return dledgerFuture.thenApply(appendEntryResponse -> {PutMessageStatus putMessageStatus = PutMessageStatus.UNKNOWN_ERROR;switch (DLedgerResponseCode.valueOf(appendEntryResponse.getCode())) {case SUCCESS:putMessageStatus = PutMessageStatus.PUT_OK;break;case INCONSISTENT_LEADER:case NOT_LEADER:case LEADER_NOT_READY:case DISK_FULL:putMessageStatus = PutMessageStatus.SERVICE_NOT_AVAILABLE;break;case WAIT_QUORUM_ACK_TIMEOUT://Do not return flush_slave_timeout to the client, for the ons client will ignore it.putMessageStatus = PutMessageStatus.OS_PAGECACHE_BUSY;break;case LEADER_PENDING_FULL:putMessageStatus = PutMessageStatus.OS_PAGECACHE_BUSY;break;}PutMessageResult putMessageResult = new PutMessageResult(putMessageStatus, appendResult);if (putMessageStatus == PutMessageStatus.PUT_OK) {// StatisticsstoreStatsService.getSinglePutMessageTopicTimesTotal(finalTopic).add(1);storeStatsService.getSinglePutMessageTopicSizeTotal(msg.getTopic()).add(appendResult.getWroteBytes());}return putMessageResult;});}

消息分发服务线程 ReputMessageService

主要负责将broker中的commitlog中的消息,分发到consumeQueue、indexFile等文件。

org.apache.rocketmq.store.DefaultMessageStore.ReputMessageService

    // 重分发消息服务线程class ReputMessageService extends ServiceThread {// 重分发消息的起始offsetprivate volatile long reputFromOffset = 0;public long getReputFromOffset() {return reputFromOffset;}public void setReputFromOffset(long reputFromOffset) {this.reputFromOffset = reputFromOffset;}@Overridepublic void shutdown() {for (int i = 0; i < 50 && this.isCommitLogAvailable(); i++) {try {Thread.sleep(100);} catch (InterruptedException ignored) {}}if (this.isCommitLogAvailable()) {log.warn("shutdown ReputMessageService, but commitlog have not finish to be dispatched, CL: {} reputFromOffset: {}",DefaultMessageStore.this.commitLog.getMaxOffset(), this.reputFromOffset);}super.shutdown();}public long behind() {return DefaultMessageStore.this.commitLog.getMaxOffset() - this.reputFromOffset;}private boolean isCommitLogAvailable() {return this.reputFromOffset < DefaultMessageStore.this.commitLog.getMaxOffset();}private void doReput() {// 如果重分发的起始offset小于最小的offset,则重置为最小的offsetif (this.reputFromOffset < DefaultMessageStore.this.commitLog.getMinOffset()) {log.warn("The reputFromOffset={} is smaller than minPyOffset={}, this usually indicate that the dispatch behind too much and the commitlog has expired.",this.reputFromOffset, DefaultMessageStore.this.commitLog.getMinOffset());this.reputFromOffset = DefaultMessageStore.this.commitLog.getMinOffset();}// 遍历读取commitLog日志for (boolean doNext = true; this.isCommitLogAvailable() && doNext; ) {if (DefaultMessageStore.this.getMessageStoreConfig().isDuplicationEnable()&& this.reputFromOffset >= DefaultMessageStore.this.getConfirmOffset()) {break;}// 读取重分发起始偏移量开始的一段内存映射区域SelectMappedBufferResult result = DefaultMessageStore.this.commitLog.getData(reputFromOffset);if (result != null) {try {this.reputFromOffset = result.getStartOffset();// 遍历读取到的内存映射文件for (int readSize = 0; readSize < result.getSize() && doNext; ) {// 校验分发请求DispatchRequest dispatchRequest =DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(result.getByteBuffer(), false, false);int size = dispatchRequest.getBufferSize() == -1 ? dispatchRequest.getMsgSize() : dispatchRequest.getBufferSize();if (dispatchRequest.isSuccess()) {if (size > 0) {// 校验成功,开始分发请求,将commitlog中的消息分发到消息队列和构建索引文件DefaultMessageStore.this.doDispatch(dispatchRequest);if (BrokerRole.SLAVE != DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole()&& DefaultMessageStore.this.brokerConfig.isLongPollingEnable()&& DefaultMessageStore.this.messageArrivingListener != null) {// 消费者使用, 如果是主节点,且长轮询模式,且存在消息到达监听器,则给监听器发通知DefaultMessageStore.this.messageArrivingListener.arriving(dispatchRequest.getTopic(),dispatchRequest.getQueueId(), dispatchRequest.getConsumeQueueOffset() + 1,dispatchRequest.getTagsCode(), dispatchRequest.getStoreTimestamp(),dispatchRequest.getBitMap(), dispatchRequest.getPropertiesMap());notifyMessageArrive4MultiQueue(dispatchRequest);}this.reputFromOffset += size;readSize += size;// 从节点,进行消息统计if (DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole() == BrokerRole.SLAVE) {DefaultMessageStore.this.storeStatsService.getSinglePutMessageTopicTimesTotal(dispatchRequest.getTopic()).add(1);DefaultMessageStore.this.storeStatsService.getSinglePutMessageTopicSizeTotal(dispatchRequest.getTopic()).add(dispatchRequest.getMsgSize());}} else if (size == 0) {this.reputFromOffset = DefaultMessageStore.this.commitLog.rollNextFile(this.reputFromOffset);readSize = result.getSize();}} else if (!dispatchRequest.isSuccess()) {if (size > 0) {log.error("[BUG]read total count not equals msg total size. reputFromOffset={}", reputFromOffset);this.reputFromOffset += size;} else {doNext = false;// If user open the dledger pattern or the broker is master node,// it will not ignore the exception and fix the reputFromOffset variableif (DefaultMessageStore.this.getMessageStoreConfig().isEnableDLegerCommitLog() ||DefaultMessageStore.this.brokerConfig.getBrokerId() == MixAll.MASTER_ID) {log.error("[BUG]dispatch message to consume queue error, COMMITLOG OFFSET: {}",this.reputFromOffset);this.reputFromOffset += result.getSize() - readSize;}}}}} finally {result.release();}} else {doNext = false;}}}
}

分发构建消费队列

    class CommitLogDispatcherBuildConsumeQueue implements CommitLogDispatcher {@Overridepublic void dispatch(DispatchRequest request) {// 事务类型final int tranType = MessageSysFlag.getTransactionValue(request.getSysFlag());switch (tranType) {// 非事务和提交事务,写入消息到消费队列case MessageSysFlag.TRANSACTION_NOT_TYPE:case MessageSysFlag.TRANSACTION_COMMIT_TYPE:DefaultMessageStore.this.putMessagePositionInfo(request);break;// 预处理事务和回滚事务,不做处理case MessageSysFlag.TRANSACTION_PREPARED_TYPE:case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:break;}}}

分发构建索引

    class CommitLogDispatcherBuildIndex implements CommitLogDispatcher {@Overridepublic void dispatch(DispatchRequest request) {// 如果激活消息索引if (DefaultMessageStore.this.messageStoreConfig.isMessageIndexEnable()) {// 构建消息索引文件DefaultMessageStore.this.indexService.buildIndex(request);}}}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/325355.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

企业微信 get请求 设置可信域名

import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController;RestController public class ValidController {GetMapping("/xxxxx.txt")public String getText() {//返回下载的txt里的内容return &…

真核微生物基因序列鉴定工具EukRep工具的安装和详细使用方法

介绍 EukRep是一种用于鉴定并分析环境中的真核微生物的工具。它基于16S rRNA基因序列&#xff0c;可以帮助研究人员确定和分类环境样品中存在的真核微生物群落。 EukRep 从宏基因组数据集中分类真核和原核序列 安装 要求Python3 推荐使用conda安装&#xff1a; $ conda cre…

OpenHarmony从入门到放弃(四)

设计一款使用Harmony开发的App 接下来我会通过设计并开发一款资讯类的App来入门OpenHarmony&#xff1b; 以下是我对App的设计想法&#xff1b; 一、模块划分 内容模块&#xff1a;App的核心模块&#xff0c;负责管理和展示资讯内容&#xff0c;具体包括内容获取与处理&…

【读书】《白帽子讲web安全》个人笔记Ⅰ-1

目录 前言&#xff1a; 第1章 我的安全世界观 1.1 Web安全简史 1.1.1中国黑客简史 1.1.2黑客技术的发展历程 1.1.3web安全的兴起 1.2黑帽子&#xff0c;白帽子 1.3返璞归真&#xff0c;揭秘安全的本质 1.4破除迷信&#xff0c;没有银弹 1.5安全三要素 1.6如何实施安…

透明OLED屏价格:影响因素与市场趋势

在当今的显示技术领域&#xff0c;透明OLED屏以其独特的透明特性和出色的显示效果&#xff0c;正逐渐成为市场的新宠。然而&#xff0c;对于许多消费者和企业来说&#xff0c;透明OLED屏的价格仍是关注的焦点。作为OLED透明屏市场部总监&#xff0c;我认为了解影响透明OLED屏价…

秋招复习之树

目录 前言 1 二叉树 二叉树常见术语 二叉树基本操作 初始化二叉树 插入与删除节点 常见二叉树类型 1. 完美二叉树 2. 完全二叉树 3. 完满二叉树 4. 平衡二叉树 二叉树的退化 2 二叉树遍历 层序遍历 代码实现 复杂度分析 前序、中序、后序遍历 复杂度分析 3 二叉树数组表示 表…

Vue CLI组件通信

目录 一、组件通信简介1.什么是组件通信&#xff1f;2.组件之间如何通信3.组件关系分类4.通信解决方案5.父子通信流程6.父向子通信代码示例7.子向父通信代码示例8.总结 二、props1.Props 定义2.Props 作用3.特点4.代码演示 三、props校验1.思考2.作用3.语法4.代码演示 四、prop…

LINUX服务器防火墙nf_conntrack问题一例

一、故障现象 业务反馈服务异常,无法响应请求&#xff0c;从系统日志 dmesg 或 /var/log/messages 看到大量以下记录&#xff1a;kernel: nf_conntrack: table full, dropping packet. 二、问题分析 业务高峰期服务器访问量大&#xff0c;内核 netfilter 模块 conntrack 相关参…

开启Android学习之旅-3-Android Activity

Android Activity 本文总结《第一行代码 Android》第3版的内容 环境&#xff1a; Android Studio Giraffe | 2022.3.1 Patch 3 Activity 是什么&#xff1f; Activity 简单将就是UI界面&#xff0c;包含两部分 Activity 类 和应用布局文件&#xff0c;如果是 Compose 则另说&…

[NSSRound#3 Team]This1sMysql

[NSSRound#3 Team]This1sMysql 源码 <?php show_source(__FILE__); include("class.php"); $conn new mysqli();if(isset($_POST[config]) && is_array($_POST[config])){foreach($_POST[config] as $key > $val){$value is_numeric($var)?(int)$…

几种常见的CSS三栏布局?介绍下粘性布局(sticky)?自适应布局?左边宽度固定,右边自适应?两种以上方式实现已知或者未知宽度的垂直水平居中?

几种常见的CSS三栏布局 流体布局 效果&#xff1a; 参考代码&#xff1a; <!DOCTYPE html> <html lang"en"><head><meta charset"UTF-8"><meta name"viewport" content"widthdevice-width, initial-scale1…

【致远FAQ】V8.0_甘特图能不能实现行表头一级一级显示(树形结构)

问题描述 甘特图能不能实现行表头一级一级显示&#xff08;树形结构&#xff09; 问题解决 设置统计时把合并同类型和显示行合计都勾选上就可以了 效果参考