[RocketMQ] Producer发送消息的总体流程 (七)

  • 单向发送: 把消息发向Broker服务器, 不管Broker是否接收, 只管发, 不管结果。
  • 同步发送: 把消息发向Broker服务器, 如果Broker成功接收, 可以得到Broker的响应。
  • 异步发送: 把消息发向Broker服务器, 如果Broker成功接收, 可以得到Broker的响应。异步所以发送消息后, 不用等待, 等到Broker服务器的响应调用回调。

文章目录

      • 1.send源码入口
        • 1.1 同步消息
        • 1.2 单向消息
        • 1.3 异步消息
      • 2.sendDefaultImpl发送消息实现
        • 2.1 makeSureStateOK确定生产者服务状态
        • 2.2 checkMessage校验消息的合法性
        • 2.3 tryToFindTopicPublishInfo查找topic的发布信息
        • 2.4 计算发送次数timesTotal
        • 2.5 selectOneMessageQueue选择消息队列
        • 2.6 sendKernelImpl发送消息
          • 2.6.1 findBrokerAddressInPublish查找broker地址
          • 2.6.2 brokerVIPChannel判断vip通道
          • 2.6.3 setUniqID生成uniqId
          • 2.6.4 tryToCompressMessage压缩消息
        • 2.7 updateFaultItem更新故障表
          • 2.7.1 computeNotAvailableDuration计算隔离时间
          • 2.7.2 updateFaultItem更新故障表
      • 3.总结

DefaultMQProducer提供了很多send方法的重载:

在这里插入图片描述
在这里插入图片描述

1.send源码入口

1.1 同步消息

在这里插入图片描述

public SendResult send(Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {//调用defaultMQProducerImpl#send发送消息return this.defaultMQProducerImpl.send(msg);
}

defaultMQProducerImpl#send发送消息。
在这里插入图片描述

调用另一个send(), 超时时间为3s。

public SendResult send(Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {//调用另一个send方法,设置超时时间参数,默认3000msreturn send(msg, this.defaultMQProducer.getSendMsgTimeout());
}

在这里插入图片描述

/*** DefaultMQProducerImpl的方法** @param msg     消息* @param timeout 超时时间,毫秒值*/
public SendResult send(Message msg,long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {//调用另一个sendDefaultImpl方法,设置消息发送模式为SYNC,即同步;设置回调函数为nullreturn this.sendDefaultImpl(msg, CommunicationMode.SYNC, null, timeout);
}

该方法内部又调用另一个sendDefaultImpl方法, 设置消息发送方式为SYNC, 为同步, 设置回调函数为null。

1.2 单向消息

单向消息使用sendOneway发送。

在这里插入图片描述

public void sendOneway(Message msg) throws MQClientException, RemotingException, InterruptedException{//根据namespace设置topicmsg.setTopic(withNamespace(msg.getTopic()));//调用defaultMQProducerImpl#sendOneway发送消息this.defaultMQProducerImpl.sendOneway(msg);
}

defaultMQProducerImpl#sendOneway。

public void sendOneway(Message msg) throws MQClientException, RemotingException, InterruptedException {try {//调用sendDefaultImpl方法,设置消息发送模式为ONEWAY,即单向;设置回调函数为null;设置超时时间参数,默认3000msthis.sendDefaultImpl(msg, CommunicationMode.ONEWAY, null, this.defaultMQProducer.getSendMsgTimeout());} catch (MQBrokerException e) {throw new MQClientException("unknown exception", e);}
}

最终调用sendDefaultImpl方法, 发送模式为ONEWAY, 设置回调函数为null, 超时时间为3s。

1.3 异步消息

异步消息使用带有callback函数的send方法发送。

在这里插入图片描述

public void send(Message msg, SendCallback sendCallback) throws MQClientException,RemotingException, InterruptedException {//根据namespace设置topicmsg.setTopic(withNamespace(msg.getTopic()));//调用defaultMQProducerImpl#send发送消息,带有sendCallback参数this.defaultMQProducerImpl.send(msg, sendCallback);
}

该方法内部调用defaultMQProducerImpl#send方法发送消息, 带sendCallback参数。

public void send(Message msg, SendCallback sendCallback) throws MQClientException, 
RemotingException, InterruptedException {//该方法内部又调用另一个send方法,设置超时时间参数,默认3000ms。send(msg, sendCallback, this.defaultMQProducer.getSendMsgTimeout());
}
public void send(final Message msg, final SendCallback sendCallback, final long timeout)throws MQClientException, RemotingException, InterruptedException {//调用起始时间final long beginStartTime = System.currentTimeMillis();//获取异步发送执行器线程池ExecutorService executor = this.getAsyncSenderExecutor();try {/** 使用线程池异步的执行sendDefaultImpl方法,即异步发送消息*/executor.submit(new Runnable() {@Overridepublic void run() {/** 发送之前计算超时时间,如果超时则不发送,直接执行回调函数onException方法*/long costTime = System.currentTimeMillis() - beginStartTime;if (timeout > costTime) {try {//调用sendDefaultImpl方法执行发送操作sendDefaultImpl(msg, CommunicationMode.ASYNC, sendCallback, timeout - costTime);} catch (Exception e) {//抛出异常,执行回调函数onException方法sendCallback.onException(e);}} else {//超时,执行回调函数onException方法sendCallback.onException(new RemotingTooMuchRequestException("DEFAULT ASYNC send call timeout"));}}});} catch (RejectedExecutionException e) {throw new MQClientException("executor rejected ", e);}}

方法内部会获取获取异步发送执行器线程池, 使用线程池异步的执行sendDefaultImpl方法, 即异步发送。

发送之前计算超时时间, 如果超时则执行回调函数onException()。

2.sendDefaultImpl发送消息实现

该方法位于DefaultMQProducerImpl中, 无论是同步, 异步, 还是单向, 最后调用的都是sendDefaultImpl方法。

  1. makeSureStateOK方法, 确定此producer的服务状态正常, 如果服务状态不是RUNNING, 那么抛出异常。
  2. 检查消息的合法性。
  3. 调用tryToFindTopicPublishInfo方法, 尝试查找消息的一个topic路由, 进行发送消息。
  4. 计算循环发送消息的总次数timesTotal, 默认情况下, 同步为3, 允许重试2次, 其他模式为1, 即不允许重试。实际上, 异步发送消息可以最多重试2次, 不是这里实现的。
  5. 调用selectOneMessageQueue方法, 选择一个队列MessageQueue, 支持失败故障转移。
  6. 调用sendKernelImpl方法发送消息, 同步、异步、单向发送消息都是这个方法实现的。
  7. 调用updateFaultItem方法, 更新本地错误表缓存数据, 用于延迟时间的故障转移的功能。
  8. 根据发送模式的不同, 如果是异步或者单向发送则直接返回, 如果是同步的话, 如果开启了retryAnotherBrokerWhenNotStoreOK, 那么如果返回值不返回SEND_OK状态, 则重新执行。
  9. 此过程中, 如果抛出RemotingException、MQClientException、以及MQBrokerException异常, 那么会重试, 如果抛出InterruptedException, 或者超时则不会重试。

在这里插入图片描述
在这里插入图片描述

/*** DefaultMQProducerImpl的方法** @param msg               方法* @param communicationMode 通信模式* @param sendCallback      回调方法* @param timeout           超时时间*/
private SendResult sendDefaultImpl(Message msg,final CommunicationMode communicationMode,final SendCallback sendCallback,final long timeout
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {/** 1 确定此producer的服务状态正常,如果服务状态不是RUNNING,那么抛出异常*/this.makeSureStateOK();/** 2 校验消息的合法性*/Validators.checkMessage(msg, this.defaultMQProducer);//生成本次调用idfinal long invokeID = random.nextLong();//开始时间戳long beginTimestampFirst = System.currentTimeMillis();long beginTimestampPrev = beginTimestampFirst;//结束时间戳long endTimestamp = beginTimestampFirst;/** 3 尝试查找消息的一个topic路由,用以发送消息*/TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());//找到有效的topic信息if (topicPublishInfo != null && topicPublishInfo.ok()) {boolean callTimeout = false;MessageQueue mq = null;Exception exception = null;SendResult sendResult = null;/** 4 计算发送消息的总次数* 同步模式为3,即默认允许重试2次,可更改重试次数;其他模式为1,即不允许重试,不可更改*/int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;int times = 0;//记录每一次重试时候发送消息目标Broker名字的数组String[] brokersSent = new String[timesTotal];/** 在循环中,发送消息,包含消息重试的逻辑,总次数默认不超过3*/for (; times < timesTotal; times++) {//上次使用过的broker,可以为空,表示第一次选择String lastBrokerName = null == mq ? null : mq.getBrokerName();/** 5 选择一个消息队列MessageQueue*/MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);if (mqSelected != null) {mq = mqSelected;//设置brokerNamebrokersSent[times] = mq.getBrokerName();try {//调用的开始时间beginTimestampPrev = System.currentTimeMillis();//如果还有可调用次数,那么if (times > 0) {//在重新发送期间用名称空间重置topicmsg.setTopic(this.defaultMQProducer.withNamespace(msg.getTopic()));}//现在调用的开始时间 减去 开始时间,判断时候在调用发起之前就超时了long costTime = beginTimestampPrev - beginTimestampFirst;//如果已经超时了,那么直接结束循环,不再发送//即超时的时候,即使还剩下重试次数,也不会再继续重试if (timeout < costTime) {callTimeout = true;break;}/** 6 异步、同步、单向发送消息*/sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);//方法调用结束时间戳endTimestamp = System.currentTimeMillis();/** 7 更新本地错误表缓存数据,用于延迟时间的故障转移的功能*/this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);/** 8 根据发送模式执行不同的处理*/switch (communicationMode) {//异步和单向模式直接返回nullcase ASYNC:return null;case ONEWAY:return null;case SYNC://同步模式,如果开启了retryAnotherBrokerWhenNotStoreOK开关,那么如果不是返回SEND_OK状态,则仍然会执行重试发送if (sendResult.getSendStatus() != SendStatus.SEND_OK) {if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {continue;}}//如果发送成功,则返回return sendResult;default:break;}} catch (RemotingException e) {//RemotingException异常,会执行重试endTimestamp = System.currentTimeMillis();this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);log.warn(msg.toString());exception = e;continue;} catch (MQClientException e) {//MQClientException异常,会执行重试endTimestamp = System.currentTimeMillis();this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);log.warn(msg.toString());exception = e;continue;} catch (MQBrokerException e) {//MQBrokerException异常endTimestamp = System.currentTimeMillis();this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);log.warn(msg.toString());exception = e;//如果返回的状态码属于一下几种,则支持重试://ResponseCode.TOPIC_NOT_EXIST,//ResponseCode.SERVICE_NOT_AVAILABLE,//ResponseCode.SYSTEM_ERROR,//ResponseCode.NO_PERMISSION,//ResponseCode.NO_BUYER_ID,//ResponseCode.NOT_IN_CURRENT_UNITif (this.defaultMQProducer.getRetryResponseCodes().contains(e.getResponseCode())) {continue;} else {//其他状态码不支持重试,如果有结果则返回,否则直接抛出异常if (sendResult != null) {return sendResult;}throw e;}} catch (InterruptedException e) {//InterruptedException异常,不会执行重试endTimestamp = System.currentTimeMillis();this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);log.warn(String.format("sendKernelImpl exception, throw exception, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);log.warn(msg.toString());log.warn("sendKernelImpl exception", e);log.warn(msg.toString());throw e;}} else {break;}}/** 抛出异常的操作*/if (sendResult != null) {return sendResult;}String info = String.format("Send [%d] times, still failed, cost [%d]ms, Topic: %s, BrokersSent: %s",times,System.currentTimeMillis() - beginTimestampFirst,msg.getTopic(),Arrays.toString(brokersSent));info += FAQUrl.suggestTodo(FAQUrl.SEND_MSG_FAILED);MQClientException mqClientException = new MQClientException(info, exception);if (callTimeout) {throw new RemotingTooMuchRequestException("sendDefaultImpl call timeout");}if (exception instanceof MQBrokerException) {mqClientException.setResponseCode(((MQBrokerException) exception).getResponseCode());} else if (exception instanceof RemotingConnectException) {mqClientException.setResponseCode(ClientErrorCode.CONNECT_BROKER_EXCEPTION);} else if (exception instanceof RemotingTimeoutException) {mqClientException.setResponseCode(ClientErrorCode.ACCESS_BROKER_TIMEOUT);} else if (exception instanceof MQClientException) {mqClientException.setResponseCode(ClientErrorCode.BROKER_NOT_EXIST_EXCEPTION);}throw mqClientException;}validateNameServerSetting();throw new MQClientException("No route info of this topic: " + msg.getTopic() + FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO),null).setResponseCode(ClientErrorCode.NOT_FOUND_TOPIC_EXCEPTION);
}

2.1 makeSureStateOK确定生产者服务状态

确定此producer的服务状态正常, 如果服务状态不是RUNNING, 那么抛出异常。

在这里插入图片描述

/*** DefaultMQProducerImpl的方法*/
private void makeSureStateOK() throws MQClientException {//服务状态不是RUNNING,那么抛出MQClientException异常。if (this.serviceState != ServiceState.RUNNING) {throw new MQClientException("The producer service state not OK, "+ this.serviceState+ FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),null);}
}

2.2 checkMessage校验消息的合法性

  1. 如果msg消息为null, 抛出异常。
  2. 校验topic, 如果topic为空, 或者长度大于127字符, 或者topic中有非法字符则抛出异常, 如果当前topic是不为允许使用的系统topic, 抛出异常。
  3. 校验消息体, 如果消息体为null, 或者为空数组, 或者消息字节数组长度大于4M, 抛出异常。
    在这里插入图片描述
/*** Validators的方法*/
public static void checkMessage(Message msg, DefaultMQProducer defaultMQProducer) throws MQClientException {//如果消息为null,抛出异常if (null == msg) {throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message is null");}/** 校验topic*///如果topic为空,或者长度大于127个字符,或者topic的字符串不符合 "^[%|a-zA-Z0-9_-]+$"模式,即包含非法字符,那么抛出异常Validators.checkTopic(msg.getTopic());//如果当前topic是不为允许使用的系统topic SCHEDULE_TOPIC_XXXX,那么抛出异常Validators.isNotAllowedSendTopic(msg.getTopic());// body//如果消息体为null,那么抛出异常if (null == msg.getBody()) {throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message body is null");}//如果消息体为空数组,那么抛出异常if (0 == msg.getBody().length) {throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message body length is zero");}//如果消息 字节数组长度大于4,194,304,即消息的大小大于4M,那么抛出异常if (msg.getBody().length > defaultMQProducer.getMaxMessageSize()) {throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL,"the message body size over max value, MAX: " + defaultMQProducer.getMaxMessageSize());}
}public static void checkTopic(String topic) throws MQClientException {//如果topic为空,那么抛出异常if (UtilAll.isBlank(topic)) {throw new MQClientException("The specified topic is blank", null);}//如果topic长度大于127个字符,那么抛出异常if (topic.length() > TOPIC_MAX_LENGTH) {throw new MQClientException(String.format("The specified topic is longer than topic max length %d.", TOPIC_MAX_LENGTH), null);}//如果topic字符串包含非法字符,那么抛出异常if (isTopicOrGroupIllegal(topic)) {throw new MQClientException(String.format("The specified topic[%s] contains illegal characters, allowing only %s", topic,"^[%|a-zA-Z0-9_-]+$"), null);}
}

2.3 tryToFindTopicPublishInfo查找topic的发布信息

该方法用于查找指定topic的发布信息TopicPublishInfo。

  1. 首先在本地缓存topicPublishInfoTable获取。
  2. updateTopicRouteInfoFromNameServer()获取, 从nameServer同步此topic的路由配置信息。
  3. 从nameServer同步topic数据。

在这里插入图片描述

/*** DefaultMQProducerImpl的方法* <p>* 查找指定topic的推送信息*/
private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {//尝试直接从producer的topicPublishInfoTable中获取topic信息TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);//如果没有获取到有效信息,if (null == topicPublishInfo || !topicPublishInfo.ok()) {//那么立即创建一个TopicPublishInfothis.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());//立即从nameServer同步此topic的路由配置信息,并且更新本地缓存this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);//再次获取topicPublishInfotopicPublishInfo = this.topicPublishInfoTable.get(topic);}//如果找到的路由信息是可用的,直接返回if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {return topicPublishInfo;} else {//再次从nameServer同步topic的数据,不过这次使用默认的topic “TBW102”去找路由配置信息作为本topic参数信息this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);topicPublishInfo = this.topicPublishInfoTable.get(topic);return topicPublishInfo;}
}
  • 首先在本地缓存topicPublishInfoTable获取, 如果没有获取到, 调用updateTopicRouteInfoFromNameServer方法从nameServer同步此topic的路由配置信息, 并更新缓存。如果还是没有获取到有效数据, 再次从nameServer同步topic数据, 不过这次默认的topic是 “TBW102”去找路由配置信息作为本topic参数信息。

TopicPublishInfo: topic的发布信息

在这里插入图片描述

/*** 是否是顺序消息*/
private boolean orderTopic = false;
/*** 是否包含路由信息*/
private boolean haveTopicRouterInfo = false;
/*** topic的消息队列集合*/
private List<MessageQueue> messageQueueList = new ArrayList<MessageQueue>();
/*** 当前线程线程的消息队列的下标,循环选择消息队列使用+1*/
private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex();
/*** topic路由信息,包括topic的队列信息queueDatas,topic的broker信息brokerDatas,顺序topic配置orderTopicConf,消费过滤信息filterServerTable等属性*/
private TopicRouteData topicRouteData;

2.4 计算发送次数timesTotal

int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + 
this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;

计算循环发送消息的总次数timesTotal, 默认情况下, 同步为3, 允许重试2次, 其他模式为1, 即不允许重试。实际上, 异步发送消息可以最多重试2次, 不是这里实现的。是MQClientAPIImpl#sendMessage方法中。

2.5 selectOneMessageQueue选择消息队列

方法内部调用mqFaultStrategy#selectOneMessageQueue方法:

在这里插入图片描述

/*** DefaultMQProducerImpl的方法** 选择一个消息队列* @param tpInfo topic信息* @param lastBrokerName 上次使用过的broker*/
public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {//调用mqFaultStrategy#selectOneMessageQueue方法return this.mqFaultStrategy.selectOneMessageQueue(tpInfo, lastBrokerName);
}

mqFaultStrategy#selectOneMessageQueue方法支持故障转移机制:

  1. 首先判断是否开启了发送延迟故障转移机制, 即判断sendLatencyFaultEnable属性是否为true, 默认为false不开启。
    1. 如果开启的话, 首先仍然是遍历消息队列, 按照轮询的方式选取一个消息队列, 当消息队列可用时, 选择消息队列的工作就结束, 否则循环选择其他队列。如果该mq的broker不存在LatencyFaultTolerance维护的faultItemTable集合属性中, 或者当前时间戳大于该broker下一次开始可用的时间戳, 则表示无障碍。
    2. 如果没有选出无故障mq, 那么从LatencyFaultTolerance维护的不是最好的broker集合faultItemTable中随机选择一个broker, 随后判断如果写队列数大于0, 那么选择该Broker。然后遍历消息队列, 采用取模的方式获取一个队列, 重置其brokerName和queueId, 进行消息发送。
    3. 如果上面的步骤抛出了异常, 那么遍历消息队列, 采用取模的方式获取一个队列。
  2. 如果没有开启延迟故障转移机制, 那么遍历消息队列, 采用取模轮询的方式获取一个brokerName与lastBrokerName不相等的队列, 即不会再次选择上次发送失败的broker。如果没有找到一个不同broker的mq, 那么退回到轮询的方式。

selectOneMessageQueue方法选择mq的时候的故障转移机制, 目的是为了保证每次消息尽快的发送, 是一种高可用手段。

  1. 延迟时间的故障转移, 消息队列选择时候, 可用过滤mq认为不可用的broker, 以免不断为宕机的broker发送消息, 选取一个延迟比较短的broker, 实现消息发送高可用。
  2. 没有开启延迟时间的故障转移的时候, 在轮询选择mq的时候, 不会选择上一次发送失败的broker, 实现消息发送高可用。
/*** MQFaultStrategy的方法* <p>* 选择一个消息队列,支持故障延迟转移** @param tpInfo         topic信息* @param lastBrokerName 上次使用过的broker*/
public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {/** 判断是否开启了发送延迟故障转移机制,默认false不打开* 如果开启了该机制,那么每次选取topic下对应的queue时,会基于之前执行的耗时,在有存在符合条件的broker的前提下,优选选取一个延迟较短的broker,否则再考虑随机选取。*/if (this.sendLatencyFaultEnable) {try {//当前线程线程的消息队列的下标,循环选择消息队列使用+1int index = tpInfo.getSendWhichQueue().incrementAndGet();//遍历消息队列,采用取模的方式获取一个队列,即轮询的方式for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {//取模int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();if (pos < 0)pos = 0;//获取该消息队列MessageQueue mq = tpInfo.getMessageQueueList().get(pos);//如果当前消息队列是可用的,即无故障,那么直接返回该mq//如果该broker不存在LatencyFaultTolerance维护的faultItemTable集合属性中,或者当前时间已经大于该broker下一次开始可用的时间点,表示无故障if (latencyFaultTolerance.isAvailable(mq.getBrokerName()))return mq;}//没有选出无故障的mq,那么一个不是最好的broker集合中随机选择一个final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();//如果写队列数大于0,那么选择该brokerint writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);if (writeQueueNums > 0) {//遍历消息队列,采用取模的方式获取一个队列,即轮询的方式final MessageQueue mq = tpInfo.selectOneMessageQueue();if (notBestBroker != null) {//重置其brokerName,queueId,进行消息发送mq.setBrokerName(notBestBroker);mq.setQueueId(tpInfo.getSendWhichQueue().incrementAndGet() % writeQueueNums);}return mq;} else {//如果写队列数小于0,那么移除该brokerlatencyFaultTolerance.remove(notBestBroker);}} catch (Exception e) {log.error("Error occurred when selecting message queue", e);}//如果上面的步骤抛出了异常,那么遍历消息队列,采用取模的方式获取一个队列,即轮询的方式return tpInfo.selectOneMessageQueue();}//如果没有发送延迟故障转移机制,那么那么遍历消息队列,即采用取模轮询的方式//获取一个brokerName与lastBrokerName不相等的队列,即不会再次选择上次发送失败的brokerreturn tpInfo.selectOneMessageQueue(lastBrokerName);
}

selectOneMessageQueue方法有两个重载方法, 一个无参的, 一个有参的。

无参的表示选择mq无限制:

/*** TopicPublishInfo的方法* <p>* 轮询的选择一个mq*/
public MessageQueue selectOneMessageQueue() {//获取下一个indexint index = this.sendWhichQueue.incrementAndGet();//取模计算索引int pos = Math.abs(index) % this.messageQueueList.size();if (pos < 0)pos = 0;//获取该索引的mqreturn this.messageQueueList.get(pos);
}

有参数, 其参数是上一次发送失败的brokerName, 表示不会选择上一次失败的brokerName的mq。如果最后没有选择出来, 那么走轮询的逻辑。

/*** TopicPublishInfo的方法** @param lastBrokerName 上一次发送失败的brokerName*/
public MessageQueue selectOneMessageQueue(final String lastBrokerName) {//如果lastBrokerName为null,即第一次发送,那么轮询选择一个if (lastBrokerName == null) {return selectOneMessageQueue();} else {for (int i = 0; i < this.messageQueueList.size(); i++) {//轮询选择一个mqint index = this.sendWhichQueue.incrementAndGet();int pos = Math.abs(index) % this.messageQueueList.size();if (pos < 0)pos = 0;MessageQueue mq = this.messageQueueList.get(pos);//如果mq的brokerName不等于lastBrokerName,就返回,否则选择下一个if (!mq.getBrokerName().equals(lastBrokerName)) {return mq;}}//没有选出来,那么轮询选择一个return selectOneMessageQueue();}
}

2.6 sendKernelImpl发送消息

选择了队列后, 调用sendKernelImpl发送消息

  1. 首先调用findBrokerAddressInPublish方法从brokerAddrTable中查找Master broker地址。如果找不到, tryToFindTopicPublishInfo从nameServer远程拉取配置, 并更新本地缓存, 再次尝试获取Master broker地址。
  2. 调用brokerVIPChannel判断是否开启vip通道, 如果开启了, 那么将brokerAddr的port – 2, vip通道的端口为普通端口-2。
  3. 如果不是批量消息, 那么设置唯一的uniqId。
  4. 如果不是批量消息, 且消息体大于4K, 那么压缩消息。
  5. 如果存在CheckForbiddenHook, 则执行checkForbidden钩子方法。如果存在SendMessageHook, 则执行sendMessageBefore钩子方法。
  6. 设置请求头信息SendMessageRequestHeader, 请求头包含各种基本属性, producerGroup, topic, queueId, 并且将消息重试次数和最大重试次数存入请求头中。
  7. 根据不同的发送模式发送消息, 如果是异步, 需要先克隆并还原消息, 最终异步, 同步, 单向都是调用MQClientAPIImpl#sendMessage方法发送消息的。
  8. 如果MQClientAPIImpl#sendMessage方法正常发送或者抛出RemotingException、MQBrokerException、InterruptedException异常, 判断如果存在SendMessageHook, 执行sendMessageAfter钩子方法。
  9. finally中对消息进行恢复。

在这里插入图片描述

在这里插入图片描述

/*** DefaultMQProducerImpl的方法* 发送消息** @param msg               消息* @param mq                mq* @param communicationMode 发送模式* @param sendCallback      发送回调* @param topicPublishInfo  topic信息* @param timeout           超时时间* @return 发送结果*/
private SendResult sendKernelImpl(final Message msg,final MessageQueue mq,final CommunicationMode communicationMode,final SendCallback sendCallback,final TopicPublishInfo topicPublishInfo,final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {//开始时间long beginStartTime = System.currentTimeMillis();/** 1 根据brokerName从brokerAddrTable中查找broker地址*/String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());//如果本地找不到 broker 的地址if (null == brokerAddr) {/** 2 从nameServer远程拉取配置,并更新本地缓存* 该方法此前就学习过了*/tryToFindTopicPublishInfo(mq.getTopic());//再次获取地址brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());}SendMessageContext context = null;if (brokerAddr != null) {/** 3 vip通道判断*/brokerAddr = MixAll.brokerVIPChannel(this.defaultMQProducer.isSendMessageWithVIPChannel(), brokerAddr);byte[] prevBody = msg.getBody();try {//for MessageBatch,ID has been set in the generating process/** 4 如果不是批量消息,那么尝试生成唯一uniqId,即UNIQ_KEY属性。MessageBatch批量消息在生成时就已经设置uniqId* uniqId也被称为客户端生成的msgId,从逻辑上代表唯一一条消息*/if (!(msg instanceof MessageBatch)) {MessageClientIDSetter.setUniqID(msg);}/** 设置nameSpace为实例Id*/boolean topicWithNamespace = false;if (null != this.mQClientFactory.getClientConfig().getNamespace()) {msg.setInstanceId(this.mQClientFactory.getClientConfig().getNamespace());topicWithNamespace = true;}//消息标识符int sysFlag = 0;//消息压缩标识boolean msgBodyCompressed = false;/** 5 尝试压缩消息*/if (this.tryToCompressMessage(msg)) {sysFlag |= MessageSysFlag.COMPRESSED_FLAG;msgBodyCompressed = true;}//事务消息标志,prepare消息final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);if (Boolean.parseBoolean(tranMsg)) {sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE;}/** 6 如果存在CheckForbiddenHook,则执行checkForbidden方法* 为什么叫禁止钩子呢,可能是想要使用者将不可发送消息的检查放在这个钩子函数里面吧(猜测)*/if (hasCheckForbiddenHook()) {CheckForbiddenContext checkForbiddenContext = new CheckForbiddenContext();checkForbiddenContext.setNameSrvAddr(this.defaultMQProducer.getNamesrvAddr());checkForbiddenContext.setGroup(this.defaultMQProducer.getProducerGroup());checkForbiddenContext.setCommunicationMode(communicationMode);checkForbiddenContext.setBrokerAddr(brokerAddr);checkForbiddenContext.setMessage(msg);checkForbiddenContext.setMq(mq);checkForbiddenContext.setUnitMode(this.isUnitMode());this.executeCheckForbiddenHook(checkForbiddenContext);}/** 7 如果存在SendMessageHook,则执行sendMessageBefore方法*/if (this.hasSendMessageHook()) {context = new SendMessageContext();context.setProducer(this);context.setProducerGroup(this.defaultMQProducer.getProducerGroup());context.setCommunicationMode(communicationMode);context.setBornHost(this.defaultMQProducer.getClientIP());context.setBrokerAddr(brokerAddr);context.setMessage(msg);context.setMq(mq);context.setNamespace(this.defaultMQProducer.getNamespace());String isTrans = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);if (isTrans != null && isTrans.equals("true")) {context.setMsgType(MessageType.Trans_Msg_Half);}if (msg.getProperty("__STARTDELIVERTIME") != null || msg.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL) != null) {context.setMsgType(MessageType.Delay_Msg);}this.executeSendMessageHookBefore(context);}/** 8 设置请求头信息*/SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());requestHeader.setTopic(msg.getTopic());requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey());requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums());requestHeader.setQueueId(mq.getQueueId());requestHeader.setSysFlag(sysFlag);requestHeader.setBornTimestamp(System.currentTimeMillis());requestHeader.setFlag(msg.getFlag());requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));requestHeader.setReconsumeTimes(0);requestHeader.setUnitMode(this.isUnitMode());requestHeader.setBatch(msg instanceof MessageBatch);//针对重试消息的处理if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {//获取消息重新消费次数属性值String reconsumeTimes = MessageAccessor.getReconsumeTime(msg);if (reconsumeTimes != null) {//将重新消费次数设置到请求头中,并且清除该属性requestHeader.setReconsumeTimes(Integer.valueOf(reconsumeTimes));MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_RECONSUME_TIME);}//获取消息的最大重试次数属性值String maxReconsumeTimes = MessageAccessor.getMaxReconsumeTimes(msg);if (maxReconsumeTimes != null) {//将最大重新消费次数设置到请求头中,并且清除该属性requestHeader.setMaxReconsumeTimes(Integer.valueOf(maxReconsumeTimes));MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_MAX_RECONSUME_TIMES);}}/** 9 根据不同的发送模式,发送消息*/SendResult sendResult = null;switch (communicationMode) {/** 异步发送模式*/case ASYNC:/** 首先克隆并还原消息** 该方法的finally中已经有还原消息的代码了,为什么在异步发送消息之前,还要先还原消息呢?** 因为异步发送时 finally 重新赋值的时机并不确定,有很大概率是在第一次发送结束前就完成了 finally 中的赋值,* 因此在内部重试前 msg.body 大概率已经被重新赋值过,而 onExceptionImpl 中的重试逻辑 MQClientAPIImpl.sendMessageAsync 不会再对数据进行压缩,* 简言之,在异步发送的情况下,如果调用 onExceptionImpl 内部的重试,有很大概率发送的是无压缩的数据*/Message tmpMessage = msg;boolean messageCloned = false;//如果开启了消息压缩if (msgBodyCompressed) {//If msg body was compressed, msgbody should be reset using prevBody.//Clone new message using commpressed message body and recover origin massage.//Fix bug:https://github.com/apache/rocketmq-externals/issues/66//克隆一个messagetmpMessage = MessageAccessor.cloneMessage(msg);messageCloned = true;//恢复原来的消息体msg.setBody(prevBody);}//如果topic整合了namespaceif (topicWithNamespace) {if (!messageCloned) {tmpMessage = MessageAccessor.cloneMessage(msg);messageCloned = true;}//还原topicmsg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQProducer.getNamespace()));}/** 发送消息之前,进行超时检查,如果已经超时了那么取消本次发送操作,抛出异常*/long costTimeAsync = System.currentTimeMillis() - beginStartTime;if (timeout < costTimeAsync) {throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");}/** 10 发送异步消息*/sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(brokerAddr,mq.getBrokerName(),tmpMessage,requestHeader,timeout - costTimeAsync,communicationMode,sendCallback,topicPublishInfo,this.mQClientFactory,this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(),context,this);break;/** 单向、同步发送模式*/case ONEWAY:case SYNC:/** 发送消息之前,进行超时检查,如果已经超时了那么取消本次发送操作,抛出异常*/long costTimeSync = System.currentTimeMillis() - beginStartTime;if (timeout < costTimeSync) {throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");}/** 10 发送单向、同步消息*/sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(brokerAddr,mq.getBrokerName(),msg,requestHeader,timeout - costTimeSync,communicationMode,context,this);break;default:assert false;break;}/** 9 如果存在SendMessageHook,则执行sendMessageAfter方法*/if (this.hasSendMessageHook()) {context.setSendResult(sendResult);this.executeSendMessageHookAfter(context);}//返回执行结果return sendResult;//如果抛出了异常,如果存在SendMessageHook,则执行sendMessageAfter方法} catch (RemotingException e) {if (this.hasSendMessageHook()) {context.setException(e);this.executeSendMessageHookAfter(context);}throw e;} catch (MQBrokerException e) {if (this.hasSendMessageHook()) {context.setException(e);this.executeSendMessageHookAfter(context);}throw e;} catch (InterruptedException e) {if (this.hasSendMessageHook()) {context.setException(e);this.executeSendMessageHookAfter(context);}throw e;} finally {/** 对消息进行恢复* 1、因为客户端可能还需要查看原始的消息内容,如果是压缩消息,则无法查看* 2、另外如果第一次压缩后消息还是大于4k,如果不恢复消息,那么客户端使用该message重新发送的时候,还会进行一次消息压缩*/msg.setBody(prevBody);msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQProducer.getNamespace()));}}throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
}
2.6.1 findBrokerAddressInPublish查找broker地址

根据brokerName向brokerAddrTable查找数据, 因为生产者只会向Master发送数据, 所以返回的是Master地址。

在这里插入图片描述

/*** MQClientInstance的方法*/
public String findBrokerAddressInPublish(final String brokerName) {//查询brokerAddrTable缓存的数据HashMap<Long/* brokerId */, String/* address */> map = this.brokerAddrTable.get(brokerName);//返回Mater节点的地址if (map != null && !map.isEmpty()) {return map.get(MixAll.MASTER_ID);}return null;
}
2.6.2 brokerVIPChannel判断vip通道

调用brokerVIPChannel判断是否开启vip通道, 如果开启了, 那么将brokerAddr的port – 2, vip通道的端口为普通端口-2。

在这里插入图片描述

/*** MixAll的方法*/
public static String brokerVIPChannel(final boolean isChange, final String brokerAddr) {//如果开启了vip通道if (isChange) {int split = brokerAddr.lastIndexOf(":");String ip = brokerAddr.substring(0, split);String port = brokerAddr.substring(split + 1);//重新拼接brokerAddr,其中port - 2String brokerAddrNew = ip + ":" + (Integer.parseInt(port) - 2);return brokerAddrNew;} else {//如果没有开启vip通道,那么返回原地址return brokerAddr;}
}

消费者拉取消息只能请求普通通道, 但是生产者可以选择vip和普通通道。

2.6.3 setUniqID生成uniqId

设置到UNIQ_KEY属性中, 批量消息在生成时就已经设置uniqId。

uniqId表示客户端生成的唯一一条消息。

在这里插入图片描述

/*** MessageClientIDSetter的方法*/
public static void setUniqID(final Message msg) {//如果这条消息不存在"UNIQ_KEY"属性,那么创建uniqId并且存入"UNIQ_KEY"属性中if (msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX) == null) {msg.putProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX, createUniqID());}
}
2.6.4 tryToCompressMessage压缩消息

消息压缩, 压缩比为5, 压缩之后, 设置压缩标志, 批量消息不支持压缩, 消息压缩有利于网络传输数据。

在这里插入图片描述

/*** DefaultMQProducerImpl的方法*/
private boolean tryToCompressMessage(final Message msg) {//如果是批量消息,那么不进行压缩if (msg instanceof MessageBatch) {//batch dose not support compressing right nowreturn false;}byte[] body = msg.getBody();if (body != null) {//如果消息长度大于4Kif (body.length >= this.defaultMQProducer.getCompressMsgBodyOverHowmuch()) {try {//进行压缩,使用的JDK自带的压缩类byte[] data = UtilAll.compress(body, zipCompressLevel);if (data != null) {//重新设置到body中msg.setBody(data);return true;}} catch (IOException e) {log.error("tryToCompressMessage exception", e);log.warn(msg.toString());}}}return false;
}

2.7 updateFaultItem更新故障表

发送消息后, 无论正常与否, 调用updateFaultItem更新故障表, 更新本地错误表缓存数据, 用于延迟时间的故障转移功能。

故障转移功能在此前的selectOneMessageQueue方法中被使用到, 用于查找一个可用的消息队列, updateFaultItem方法判断是否开启了故障转移功能, 会更新LatencyFaultTolerance维护的faultItemTable集合属性中的异常broker数据。

在这里插入图片描述

/*** DefaultMQProducerImpl的方法* @param brokerName brokerName* @param currentLatency 当前延迟* @param isolation 是否使用默认隔离*/
public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) {//调用MQFaultStrategy#updateFaultItem方法this.mqFaultStrategy.updateFaultItem(brokerName, currentLatency, isolation);
}

MQFaultStrategy#updateFaultItem方法。其根据本次发送消息的延迟时间currentLatency, 计算出broker的隔离时间duration, 即是broker的下一个可用时间点。用于更新故障记录表。

/*** MQFaultStrategy的方法** @param brokerName     brokerName* @param currentLatency 当前延迟* @param isolation      是否使用默认隔离时间*/
public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) {//如果开启了故障转移,即sendLatencyFaultEnable为true,默认falseif (this.sendLatencyFaultEnable) {//根据消息当前延迟currentLatency计算当前broker的故障延迟的时间duration//如果isolation为true,则使用默认隔离时间30000,即30slong duration = computeNotAvailableDuration(isolation ? 30000 : currentLatency);//更新故障记录表this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration);}
}
2.7.1 computeNotAvailableDuration计算隔离时间

根据消息当前延迟currentLatency计算当前broker的故障延迟的时间duration, 据此即可以计算出该broker的下一个可用时间点。

在这里插入图片描述

//延迟等级
private long[] latencyMax = {50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L};
//不可用时间等级
private long[] notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L};/*** MQFaultStrategy的方法** @param currentLatency 当前延迟* @return 故障延迟的时间*/
private long computeNotAvailableDuration(final long currentLatency) {//倒叙遍历latencyMaxfor (int i = latencyMax.length - 1; i >= 0; i--) {//选择broker延迟时间对应的broker不可用时间,默认30000对应的故障延迟的时间为600000,即10分钟if (currentLatency >= latencyMax[i])return this.notAvailableDuration[i];}return 0;
}

latencyMax为延迟等级, notAvailableDuration为隔离时间。之间的关系:

在这里插入图片描述

2.7.2 updateFaultItem更新故障表

该方法更新LatencyFaultToleranceImpl维护的faultItemTable集合属性中的异常broker的故障信息, 将会设置发送消息的延迟时间currentLatency属性, 以及下一个可用时间点LatencyFaultToleranceImpl属性。

下次可用时间LatencyFaultToleranceImpl属性= 现在的时间 + 隔离的时间, 在selectOneMessageQueue方法选取消息队列的时候, 如果开启了故障转移, 那么会查找下一个可用时间点小于当前时间点的broker的队列来发送消息。

在这里插入图片描述

/*** LatencyFaultToleranceImpl的方法** @param name                 brokerName* @param currentLatency       当前延迟* @param notAvailableDuration 隔离时间(不可用时间)*/
@Override
public void updateFaultItem(final String name, final long currentLatency, final long notAvailableDuration) {//获取该broker此前的故障记录数据FaultItem old = this.faultItemTable.get(name);//如果此前没有数据,那么设置一个新对象肌凝乳if (null == old) {final FaultItem faultItem = new FaultItem(name);//设置当前延迟faultItem.setCurrentLatency(currentLatency);//设置下一次可用时间点faultItem.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);//已有故障记录,更新old = this.faultItemTable.putIfAbsent(name, faultItem);if (old != null) {old.setCurrentLatency(currentLatency);old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);}} else {//已有故障记录,更新old.setCurrentLatency(currentLatency);old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);}
}

3.总结

  1. 生产者消息重试: RocketMQ的消费者消息重试和生产者消息重投。

  2. 生产者故障转移: 通过sendLatencyFaultEnable属性配置是否开启, 目的是为了保证每次发送消息成功, 是一种高可用手段。

    1. 延迟时间的故障转移, 需要sendLatencyFaultEnable为true, 消息队列选择时候, 可用过滤mq认为不可用的broker, 以免不断为宕机的broker发送消息, 选取一个延迟比较短的broker, 实现消息发送高可用。
    2. 没有开启延迟时间的故障转移的时候, 在轮询选择mq的时候, 不会选择上一次发送失败的broker, 实现消息发送高可用。
  3. Vip通道: VIP通道用于隔离读写操作, 消费者拉取消息只能请求普通通道, 生产者可用选择vip或者普通通道。

  4. 故障转移表: RocketMQ的Producer生产者故障转移依赖于故障转移表实现, 是一个HasmMap。消息发送结束之后, 会根据本次发送消息的延迟时间currentLatency, 计算该broker的隔离时间duration, 即为broker的下一次可用时间点。然后更新故障记录表。故障转移表的key为brokerName, value为未来该broker可用时间。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/3693.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

LLaMA模型微调版本:斯坦福 Alpaca 详解

项目代码&#xff1a;https://github.com/tatsu-lab/stanford_alpaca 博客介绍&#xff1a;https://crfm.stanford.edu/2023/03/13/alpaca.html Alpaca 总览 Alpaca 是 LLaMA-7B 的微调版本&#xff0c;使用Self-instruct[2]方式借用text-davinct-003构建了52K的数据&#x…

vue3框架开发uniapp高仿度小满金融App项目

vue3框架开发uniapp高仿度小满金融App项目 心血来潮写了度小满前端项目使用vue3开发地址&#xff1a;度小满金融 下面是实现效果

第一次安装cocoapods经历

先是执行&#xff1a;sudo gem install cocoapods 报错&#xff1a; ERROR: Error installing cocoapods: The last version of activesupport (> 5.0, < 8) to support your Ruby & RubyGems was 6.1.7.3. Try installing it with gem install activesupport -v…

CSS 备忘录2-动画、渐变、颜色、选择器等

1、背景 background属性是八个属性的简写形式&#xff1a; background-image 指定一个文件或生成的颜色渐变作为背景图片background-position 设置图片的初始位置background-size 指定背景图片的渲染尺寸background-repeat 是否平铺图片ba…

智能相机的功能介绍

智能视觉检测相机主要是应用在工业检测领域图像分析识别、视觉检测判断。相机具有颜色有无判别、颜色面积计算、轮廓查找定位、物体特征灰度匹配、颜色或灰度浓淡检测、物体计数、尺寸测量、条码二维码识别读取、尺寸测量、机械收引导定位、字符识别等功能。相机带有HDMI高清视…

Java版企业电子招投标系统源码 Spring Cloud+Spring Boot 电子招标采购系统功能清单

一、立项管理 1、招标立项申请 功能点&#xff1a;招标类项目立项申请入口&#xff0c;用户可以保存为草稿&#xff0c;提交。 2、非招标立项申请 功能点&#xff1a;非招标立项申请入口、用户可以保存为草稿、提交。 3、采购立项列表 功能点&#xff1a;对草稿进行编辑&#x…

电脑键盘点击记录

这里写自定义目录标题 通过敲击键盘&#xff0c;记录键盘按键&#xff0c;并记录下来&#xff0c;保存在电脑一个路径下&#xff0c;txt文档格式记录 通过敲击键盘&#xff0c;记录键盘按键&#xff0c;并记录下来&#xff0c;保存在电脑一个路径下&#xff0c;txt文档格式记录…

English Learning - L3 纠音 W8 Lesson7 Ted Living Beyond Limits 2023.06.27 周二

朗读内容&#xff1a; Lesson 7 Day 47 - 51 句子 Ted Living Beyond Limits 3-22

kafka入门,Kafka 副本(十三)

Kafka副本 副本基本信息 1&#xff09;Kafka副本作用&#xff0c;提高数据可靠性 2&#xff09;Kafka默认副本1个&#xff0c;生产环境一般配置2个&#xff0c;保证数据可靠性&#xff0c;太多副本会增加磁盘存储空间&#xff0c;增加网络上数据传输&#xff0c;降低效率 3&a…

Kafka最基础使用

一、概念 2、应用场景 异步处理系统解耦流量削峰日志处理 3、消息队列的两种模式 点对点模式 消息发送者生产消息发送到消息队列中&#xff0c;然后消息接收者从消息队列中取出并且消费消息。消息被消费以后&#xff0c;消息队列中不再有存储&#xff0c;所以消息接收者不可…

【爬虫】对某某贴吧主页的爬虫分析+源码

1. 网站分析 想要的内容有标题、时间和帖子跳转链接 查看网站源代码&#xff0c;发现想要的内容就在里面&#xff0c;那就好办了&#xff0c;直接上正则&#xff0c;当然beautifulsoup也不是不可以 2. Python源码 import requests import re from prettytable import PrettyTa…

Solr框架 02.Solr操作(document操作和query查询)

菜单项目Documents使用办法 其中的document选项&#xff1a; 以XML格式举例 1新增/修改 当id不存在时新增&#xff0c;当id存在修改。 <doc> <field name"id">8</field> <field name"name">明天更大卖</field> <field n…