1 前言
本节主要记录下基于 AQS 衍生出来的一些常用锁比如:CountDownLatch、ReentrantLock、Semaphore、ReentrantReadWriteLock 等他们在源码中的一些应用,好记性不如烂笔头。
2 CountDownLatch
2.1 RocketMQ 中 Broker 向 所有NameServer 的注册
RocketMQ 路由注册是通过 Broker 与 NameServer 的心跳功能实现的。 Broker启动时向集群中所有的NameServ巳r发送心跳语句,每隔 30s 向集群中所有NameServer发送心跳包, NameServer 收到 Broker 心跳包时会更新 brokerLiveTable 缓存中 BrokerLivelnfo 的 lastUpdateTimestamp ,然后 Nam巳 Server 每隔 10s 扫描 brokerLiveTable,如果连续 !20s 没有收到心跳包, NameServer将移除该Broker 的路由信息同时关闭 Socket连接。
那么这里比如我有 4个 NameServer,是循环遍历一个一个注册么?其实不是,这里就用到了线程池 + CountDownLatch,我们看下:
public List<RegisterBrokerResult> registerBrokerAll(final String clusterName,final String brokerAddr,final String brokerName,final long brokerId,final String haServerAddr,final TopicConfigSerializeWrapper topicConfigWrapper,final List<String> filterServerList,final boolean oneway,final int timeoutMills,final boolean enableActingMaster,final boolean compressed,final Long heartbeatTimeoutMillis,final BrokerIdentity brokerIdentity) {final List<RegisterBrokerResult> registerBrokerResultList = new CopyOnWriteArrayList<>();List<String> nameServerAddressList = this.remotingClient.getAvailableNameSrvList();if (nameServerAddressList != null && nameServerAddressList.size() > 0) {final RegisterBrokerRequestHeader requestHeader = new RegisterBrokerRequestHeader();requestHeader.setBrokerAddr(brokerAddr);requestHeader.setBrokerId(brokerId);requestHeader.setBrokerName(brokerName);requestHeader.setClusterName(clusterName);requestHeader.setHaServerAddr(haServerAddr);requestHeader.setEnableActingMaster(enableActingMaster);requestHeader.setCompressed(false);if (heartbeatTimeoutMillis != null) {requestHeader.setHeartbeatTimeoutMillis(heartbeatTimeoutMillis);}RegisterBrokerBody requestBody = new RegisterBrokerBody();requestBody.setTopicConfigSerializeWrapper(TopicConfigAndMappingSerializeWrapper.from(topicConfigWrapper));requestBody.setFilterServerList(filterServerList);final byte[] body = requestBody.encode(compressed);final int bodyCrc32 = UtilAll.crc32(body);requestHeader.setBodyCrc32(bodyCrc32);// 创建一个计数器锁(数量为 nameServer 的个数)final CountDownLatch countDownLatch = new CountDownLatch(nameServerAddressList.size());// 循环遍历for (final String namesrvAddr : nameServerAddressList) {// 往线程池中扔任务brokerOuterExecutor.execute(new AbstractBrokerRunnable(brokerIdentity) {@Overridepublic void run0() {try {RegisterBrokerResult result = registerBroker(namesrvAddr, oneway, timeoutMills, requestHeader, body);if (result != null) {registerBrokerResultList.add(result);}LOGGER.info("Registering current broker to name server completed. TargetHost={}", namesrvAddr);} catch (Exception e) {LOGGER.error("Failed to register current broker to name server. TargetHost={}", namesrvAddr, e);} finally {// 计数器减1 countDownLatch.countDown();}}});}try {// 超时等待都注册完毕if (!countDownLatch.await(timeoutMills, TimeUnit.MILLISECONDS)) {LOGGER.warn("Registration to one or more name servers does NOT complete within deadline. Timeout threshold: {}ms", timeoutMills);}} catch (InterruptedException ignore) {}}return registerBrokerResultList; }
可以看到是通过遍历,向线程池中执行任务,每个线程注册完会让锁减1,最后下边 await 等待所有的线程都注册完毕。
3 ReentrantLock
4 Semaphore
5 ReentrantReadWriteLock
6 小结
归纳总结是对技术的巩固以及认识的增强,看看人家别人怎么用的,什么场景下用的,用法上跟自己的有什么不同,甚至在自己写业务的时候,是不是能直接借鉴下,哈哈哈,受益颇多都,所以大家也要学会总结积累哈。本节就主要记录下锁的一些源码应用(会持续增加),有理解不对的地方欢迎指正哈。