分布式锁—7.Curator的分布式锁

news/2025/3/10 3:45:12/文章来源:https://www.cnblogs.com/mjunz/p/18760078

大纲

1.Curator的可重入锁的源码

2.Curator的非可重入锁的源码

3.Curator的可重入读写锁的源码

4.Curator的MultiLock源码

5.Curator的Semaphore源码

 

1.Curator的可重入锁的源码

(1)InterProcessMutex获取分布式锁

(2)InterProcessMutex的初始化

(3)InterProcessMutex.acquire()尝试获取锁

(4)LockInternals.attemptLock()尝试获取锁

(5)不同客户端线程获取锁时的互斥实现

(6)同一客户端线程可重入加锁的实现

(7)客户端线程释放锁的实现

(8)客户端线程释放锁后其他线程获取锁的实现

(9)InterProcessMutex就是一个公平锁

 

(1)InterProcessMutex获取分布式锁

public class Demo {public static void main(String[] args) throws Exception {RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181", 5000, 3000, retryPolicy);client.start();System.out.println("已经启动Curator客户端");//获取分布式锁InterProcessMutex lock = new InterProcessMutex(client, "/locks/myLock");lock.acquire();Thread.sleep(1000);lock.release();}
}

(2)InterProcessMutex的初始化

设置锁的节点路径basePath + 初始化一个LockInternals对象实例。

public class InterProcessMutex implements InterProcessLock, Revocable<InterProcessMutex> {private final LockInternals internals;private final String basePath;private static final String LOCK_NAME = "lock-";...public InterProcessMutex(CuratorFramework client, String path) {this(client, path, new StandardLockInternalsDriver());}public InterProcessMutex(CuratorFramework client, String path, LockInternalsDriver driver) {this(client, path, LOCK_NAME, 1, driver);}//初始化InterProcessMutexInterProcessMutex(CuratorFramework client, String path, String lockName, int maxLeases, LockInternalsDriver driver) {//1.设置锁的节点路径basePath = PathUtils.validatePath(path);//2.初始化一个LockInternals对象实例internals = new LockInternals(client, driver, path, lockName, maxLeases);}
}public class LockInternals {private final LockInternalsDriver driver;private final String lockName;private volatile int maxLeases;private final WatcherRemoveCuratorFramework client;private final String basePath;private final String path;...LockInternals(CuratorFramework client, LockInternalsDriver driver, String path, String lockName, int maxLeases) {this.driver = driver;this.lockName = lockName;this.maxLeases = maxLeases;this.client = client.newWatcherRemoveCuratorFramework();this.basePath = PathUtils.validatePath(path);this.path = ZKPaths.makePath(path, lockName);}...
}

(3)InterProcessMutex.acquire()尝试获取锁

LockData是InterProcessMutex的一个静态内部类。一个线程对应一个LockData实例对象,用来描述线程持有的锁的具体情况。多个线程对应的LockData存放在一个叫threadData的ConcurrentMap中。LockData中有一个原子变量lockCount,用于锁的重入次数计数。

 

在执行InterProcessMutex的acquire()方法尝试获取锁时:首先会尝试取出当前线程对应的LockData数据,判断是否存在。如果存在,则说明锁正在被当前线程重入,重入次数自增后直接返回。如果不存在,则调用LockInternals的attemptLock()方法尝试获取锁。默认情况下,attemptLock()方法传入的等待获取锁的时间time = -1。

public class InterProcessMutex implements InterProcessLock, Revocable<InterProcessMutex> {private final LockInternals internals;private final String basePath;private static final String LOCK_NAME = "lock-";//一个线程对应一个LockData数据对象private final ConcurrentMap<Thread, LockData> threadData = Maps.newConcurrentMap();...//初始化InterProcessMutexInterProcessMutex(CuratorFramework client, String path, String lockName, int maxLeases, LockInternalsDriver driver) {//设置锁的路径basePath = PathUtils.validatePath(path);//初始化LockInternalsinternals = new LockInternals(client, driver, path, lockName, maxLeases);}@Overridepublic void acquire() throws Exception {//获取分布式锁,会一直阻塞等待直到获取成功//相同的线程可以重入锁,每一次调用acquire()方法都要匹配一个release()方法的调用if (!internalLock(-1, null)) {throw new IOException("Lost connection while trying to acquire lock: " + basePath);}}private boolean internalLock(long time, TimeUnit unit) throws Exception {//获取当前线程Thread currentThread = Thread.currentThread();//获取当前线程对应的LockData数据LockData lockData = threadData.get(currentThread);if (lockData != null) {//可重入计算lockData.lockCount.incrementAndGet();return true;}//调用LockInternals.attemptLock()方法尝试获取锁,默认情况下,传入的time=-1,表示等待获取锁的时间String lockPath = internals.attemptLock(time, unit, getLockNodeBytes());if (lockPath != null) {//获取锁成功,将当前线程 + 其创建的临时顺序节点路径,封装成一个LockData对象LockData newLockData = new LockData(currentThread, lockPath);//然后把该LockData对象存放到InterProcessMutex.threadData这个Map中threadData.put(currentThread, newLockData);return true;}return false;}//LockData是InterProcessMutex的一个静态内部类private static class LockData {final Thread owningThread;final String lockPath;final AtomicInteger lockCount = new AtomicInteger(1);//用于锁的重入次数计数private LockData(Thread owningThread, String lockPath) {this.owningThread = owningThread;this.lockPath = lockPath;}}protected byte[] getLockNodeBytes() {return null;}...
}

(4)LockInternals.attemptLock()尝试获取锁

先创建临时节点,再判断是否满足获取锁的条件。

 

步骤一:首先调用LockInternalsDriver的createsTheLock()方法创建一个临时顺序节点。其中creatingParentContainersIfNeeded()表示级联创建,forPath(path)表示创建的节点路径名称,withMode(CreateMode.EPHEMERAL_SEQUENTIAL)表示临时顺序节点。

 

步骤二:然后调用LockInternals的internalLockLoop()方法检查是否获取到了锁。在LockInternals的internalLockLoop()方法的while循环中,会先获取排好序的客户端线程尝试获取锁时创建的临时顺序节点名称列表。然后获取当前客户端线程尝试获取锁时创建的临时顺序节点的名称,再根据名称获取在节点列表中的位置 + 是否可以获取锁 + 前一个节点的路径,也就是获取一个封装好这些信息的PredicateResults对象。

 

具体会根据节点名称获取当前线程创建的临时顺序节点在节点列表的位置,然后会比较当前线程创建的节点的位置和maxLeases的大小。其中maxLeases代表了同时允许多少个客户端可以获取到锁,默认是1。如果当前线程创建的节点的位置小,则表示可以获取锁。如果当前线程创建的节点的位置大,则表示获取锁失败。

 

获取锁成功,则会中断LockInternals的internalLockLoop()方法的while循环,然后向外返回当前客户端线程创建的临时顺序节点路径。接着在InterProcessMutex的internalLock()方法中,会将当前线程 + 其创建的临时顺序节点路径,封装成一个LockData对象,然后把该LockData对象存放到InterProcessMutex.threadData这个Map中。

 

获取锁失败,则通过PredicateResults对象先获取前一个节点路径名称。然后通过getData()方法获取前一个节点路径在zk的信息,并添加Watcher监听。该Watcher监听主要是用来唤醒在LockInternals中被wait()阻塞的线程。添加完Watcher监听后,便会调用wait()方法将当前线程挂起。

 

所以前一个节点发生变化时,便会通知添加的Watcher监听。然后便会唤醒阻塞的线程,继续执行internalLockLoop()方法的while循环。while循环又会继续获取排序的节点列表 + 判断当前线程是否已获取锁。

public class LockInternals {private final LockInternalsDriver driver;LockInternals(CuratorFramework client, LockInternalsDriver driver, String path, String lockName, int maxLeases) {this.driver = driver;this.path = ZKPaths.makePath(path, lockName);//生成要创建的临时节点路径名称...}...String attemptLock(long time, TimeUnit unit, byte[] lockNodeBytes) throws Exception {//获取当前时间final long startMillis = System.currentTimeMillis();//默认情况下millisToWait=nullfinal Long millisToWait = (unit != null) ? unit.toMillis(time) : null;//默认情况下localLockNodeBytes也是nullfinal byte[] localLockNodeBytes = (revocable.get() != null) ? new byte[0] : lockNodeBytes;int retryCount = 0;String ourPath = null;boolean hasTheLock = false;//是否已经获取到锁boolean isDone = false;//是否正在获取锁while (!isDone) {isDone = true;//1.这里是关键性的加锁代码,会去级联创建一个临时顺序节点ourPath = driver.createsTheLock(client, path, localLockNodeBytes);//2.检查是否获取到了锁hasTheLock = internalLockLoop(startMillis, millisToWait, ourPath);}if (hasTheLock) {return ourPath;}return null;}private final Watcher watcher = new Watcher() {@Overridepublic void process(WatchedEvent event) {//唤醒LockInternals中被wait()阻塞的线程client.postSafeNotify(LockInternals.this);}};//检查是否获取到了锁private boolean internalLockLoop(long startMillis, Long millisToWait, String ourPath) throws Exception {boolean haveTheLock = false;boolean doDelete = false;...while ((client.getState() == CuratorFrameworkState.STARTED) && !haveTheLock) {//3.获取排好序的各个客户端线程尝试获取分布式锁时创建的临时顺序节点名称列表List<String> children = getSortedChildren();//4.获取当前客户端线程尝试获取分布式锁时创建的临时顺序节点的名称String sequenceNodeName = ourPath.substring(basePath.length() + 1); // +1 to include the slash//5.获取当前线程创建的节点在节点列表中的位置 + 是否可以获取锁 + 前一个节点的路径名称PredicateResults predicateResults = driver.getsTheLock(client, children, sequenceNodeName, maxLeases);if (predicateResults.getsTheLock()) {//获取锁成功//返回truehaveTheLock = true;} else {//获取锁失败//获取前一个节点路径名称String previousSequencePath = basePath + "/" + predicateResults.getPathToWatch();synchronized(this) {//use getData() instead of exists() to avoid leaving unneeded watchers which is a type of resource leak//通过getData()获取前一个节点路径在zk的信息,并添加watch监听client.getData().usingWatcher(watcher).forPath(previousSequencePath);//默认情况下,millisToWait = nullif (millisToWait != null) {millisToWait -= (System.currentTimeMillis() - startMillis);startMillis = System.currentTimeMillis();if (millisToWait <= 0) {doDelete = true;//timed out - delete our nodebreak;}wait(millisToWait);//阻塞} else {wait();//阻塞}}}}...return haveTheLock;}List<String> getSortedChildren() throws Exception {//获取排好序的各个客户端线程尝试获取分布式锁时创建的临时顺序节点名称列表return getSortedChildren(client, basePath, lockName, driver);}public static List<String> getSortedChildren(CuratorFramework client, String basePath, final String lockName, final LockInternalsSorter sorter) throws Exception {//获取各个客户端线程尝试获取分布式锁时创建的临时顺序节点名称列表List<String> children = client.getChildren().forPath(basePath);//对节点名称进行排序List<String> sortedList = Lists.newArrayList(children);Collections.sort(sortedList,new Comparator<String>() {@Overridepublic int compare(String lhs, String rhs) {return sorter.fixForSorting(lhs, lockName).compareTo(sorter.fixForSorting(rhs, lockName));}});return sortedList;}...
}public class StandardLockInternalsDriver implements LockInternalsDriver {...//级联创建一个临时顺序节点@Overridepublic String createsTheLock(CuratorFramework client, String path, byte[] lockNodeBytes) throws Exception {String ourPath;//默认情况下传入的lockNodeBytes=nullif (lockNodeBytes != null) {ourPath = client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path, lockNodeBytes);} else {//创建临时顺序节点ourPath = client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path);}return ourPath;}//获取当前线程创建的节点在节点列表中的位置以及是否可以获取锁@Overridepublic PredicateResults getsTheLock(CuratorFramework client, List<String> children, String sequenceNodeName, int maxLeases) throws Exception {//根据节点名称获取当前线程创建的临时顺序节点在节点列表中的位置int ourIndex = children.indexOf(sequenceNodeName);validateOurIndex(sequenceNodeName, ourIndex);//maxLeases代表的是同时允许多少个客户端可以获取到锁//getsTheLock为true表示可以获取锁,getsTheLock为false表示获取锁失败boolean getsTheLock = ourIndex < maxLeases;//获取当前节点需要watch的前一个节点路径String pathToWatch = getsTheLock ? null : children.get(ourIndex - maxLeases);return new PredicateResults(pathToWatch, getsTheLock);}...
}

(5)不同客户端线程获取锁时的互斥实现

maxLeases代表了同时允许多少个客户端可以获取到锁,默认值是1。能否获取锁的判断就是:线程创建的节点的位置outIndex < maxLeases。当线程1创建的节点在节点列表中排第一时,满足outIndex = 0 < maxLeases = 1,可以获取锁。当线程2创建的节点再节点列表中排第二时,不满足outIndex = 1 < maxLeases = 1,所以不能获取锁。从而实现线程1和线程2获取锁时的互斥。

 

(6)同一客户端线程可重入加锁的实现

客户端线程重复获取锁时,会重复调用InterProcessMutex的internalLock()方法。在InterProcessMutex的internalLock()方法中:线程第一次获取锁成功会创建一个LockData对象,并存放在一个Map中。线程第二次获取锁时,便会从这个Map中取出这个LockData对象,并对LockData对象中的重入计数器lockCount进行递增,接着就返回true。以此实现可重入加锁。

 

(7)客户端线程释放锁的实现

客户端线程释放锁时会调用InterProcessMutex的release()方法。

 

首先对LockData里的重入计数器进行递减。当重入计数器大于0时,直接返回。当重入计数器为0时才执行下一步删除节点的操作。

 

然后删除客户端线程创建的临时顺序节点,client.delete().guaranteed().forPath(ourPath)。

public class InterProcessMutex implements InterProcessLock, Revocable<InterProcessMutex> {private final LockInternals internals;private final ConcurrentMap<Thread, LockData> threadData = Maps.newConcurrentMap();...@Overridepublic void release() throws Exception {//获取当前线程Thread currentThread = Thread.currentThread();//获取当前线程对应的LockData对象LockData lockData = threadData.get(currentThread);if (lockData == null) {throw new IllegalMonitorStateException("You do not own the lock: " + basePath);}//1.首先对LockData里的重入计数器lockCount进行递减int newLockCount = lockData.lockCount.decrementAndGet();if (newLockCount > 0) {//当重入计数器大于0时,直接返回return;}if (newLockCount < 0) {throw new IllegalMonitorStateException("Lock count has gone negative for lock: " + basePath);}try {//2.当重入计数器为0时执行删除节点的操作internals.releaseLock(lockData.lockPath);} finally {threadData.remove(currentThread);}}...
}public class LockInternals {...final void releaseLock(String lockPath) throws Exception {client.removeWatchers();revocable.set(null);deleteOurPath(lockPath);}private void deleteOurPath(String ourPath) throws Exception {//删除节点client.delete().guaranteed().forPath(ourPath);}...
}

(8)客户端线程释放锁后其他线程获取锁的实现

由于在节点列表里排第二的节点对应的线程会监听排第一的节点,而当持有锁的客户端线程释放锁后,排第一的节点会被删除掉。所以在节点列表里排第二的节点对应的客户端,便会收到zk的通知。于是会回调执行该线程添加的Watcher的process()方法,也就是唤醒该线程,让其继续执行while循环获取锁。

public class LockInternals {...private final Watcher watcher = new Watcher() {@Overridepublic void process(WatchedEvent event) {//唤醒LockInternals中被wait()阻塞的线程client.postSafeNotify(LockInternals.this);}};//检查是否获取到了锁private boolean internalLockLoop(long startMillis, Long millisToWait, String ourPath) throws Exception {boolean haveTheLock = false;boolean doDelete = false;...while ((client.getState() == CuratorFrameworkState.STARTED) && !haveTheLock) {//3.获取排好序的各个客户端线程尝试获取分布式锁时创建的临时顺序节点名称列表List<String> children = getSortedChildren();//4.获取当前客户端线程尝试获取分布式锁时创建的临时顺序节点的名称String sequenceNodeName = ourPath.substring(basePath.length() + 1); // +1 to include the slash//5.获取当前线程创建的节点在节点列表中的位置+是否可以获取锁+前一个节点的路径名称PredicateResults predicateResults = driver.getsTheLock(client, children, sequenceNodeName, maxLeases);if (predicateResults.getsTheLock()) {//获取锁成功//返回truehaveTheLock = true;} else {//获取锁失败//获取前一个节点路径名称String previousSequencePath = basePath + "/" + predicateResults.getPathToWatch();synchronized(this) {//use getData() instead of exists() to avoid leaving unneeded watchers which is a type of resource leak//通过getData()获取前一个节点路径在zk的信息,并添加watch监听client.getData().usingWatcher(watcher).forPath(previousSequencePath);//默认情况下,millisToWait = nullif (millisToWait != null) {millisToWait -= (System.currentTimeMillis() - startMillis);startMillis = System.currentTimeMillis();if (millisToWait <= 0) {doDelete = true;//timed out - delete our nodebreak;}wait(millisToWait);//阻塞} else {wait();//阻塞}}}}...return haveTheLock;}...
}

(9)InterProcessMutex就是一个公平锁

因为所有客户端线程都会创建一个顺序节点,然后按申请锁的顺序进行排序。最后会依次按自己所在的排序来尝试获取锁,实现了所有客户端排队获取锁。

 

2.Curator的非可重入锁的源码

(1)Curator的非可重入锁InterProcessSemaphoreMutex的使用

(2)Curator的非可重入锁InterProcessSemaphoreMutex的源码

 

(1)Curator的非可重入锁InterProcessSemaphoreMutex的使用

非可重入锁:同一个时间只能有一个客户端线程获取到锁,其他线程都要排队,而且同一个客户端线程是不可重入加锁的。

public class Demo {public static void main(String[] args) throws Exception {RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);final CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181",//zk的地址5000,//客户端和zk的心跳超时时间,超过该时间没心跳,Session就会被断开3000,//连接zk时的超时时间retryPolicy);client.start();System.out.println("已经启动Curator客户端,完成zk的连接");//非可重入锁InterProcessSemaphoreMutex lock = new InterProcessSemaphoreMutex(client, "/locks");lock.acquire();Thread.sleep(3000);lock.release();}
}

(2)Curator的非可重入锁InterProcessSemaphoreMutex的源码

Curator的非可重入锁是基于Semaphore来实现的,也就是将Semaphore允许获取Lease的客户端线程数设置为1,从而实现同一时间只能有一个客户端线程获取到Lease。

public class InterProcessSemaphoreMutex implements InterProcessLock {private final InterProcessSemaphoreV2 semaphore;private final WatcherRemoveCuratorFramework watcherRemoveClient;private volatile Lease lease;public InterProcessSemaphoreMutex(CuratorFramework client, String path) {watcherRemoveClient = client.newWatcherRemoveCuratorFramework();this.semaphore = new InterProcessSemaphoreV2(watcherRemoveClient, path, 1);}@Overridepublic void acquire() throws Exception {//获取非可重入锁就是获取Semaphore的Leaselease = semaphore.acquire();}@Overridepublic boolean acquire(long time, TimeUnit unit) throws Exception {Lease acquiredLease = semaphore.acquire(time, unit);if (acquiredLease == null) {return false;}lease = acquiredLease;return true;}@Overridepublic void release() throws Exception {//释放非可重入锁就是释放Semaphore的LeaseLease lease = this.lease;Preconditions.checkState(lease != null, "Not acquired");this.lease = null;lease.close();watcherRemoveClient.removeWatchers();}
}

 

3.Curator的可重入读写锁的源码

(1)Curator的可重入读写锁InterProcessReadWriteLock的使用

(2)Curator的可重入读写锁InterProcessReadWriteLock的初始化

(3)InterProcessMutex获取锁的源码

(4)先获取读锁 + 后获取读锁的情形分析

(5)先获取读锁 + 后获取写锁的情形分析

(6)先获取写锁 + 后获取读锁的情形分析

(7)先获取写锁 + 再获取写锁的情形分析

 

(1)Curator的可重入读写锁InterProcessReadWriteLock的使用

public class Demo {public static void main(String[] args) throws Exception {RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);final CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181",//zk的地址5000,//客户端和zk的心跳超时时间,超过该时间没心跳,Session就会被断开3000,//连接zk时的超时时间retryPolicy);client.start();System.out.println("已经启动Curator客户端,完成zk的连接");//读写锁InterProcessReadWriteLock lock = new InterProcessReadWriteLock(client, "/locks");lock.readLock().acquire();lock.readLock().release();lock.writeLock().acquire();lock.writeLock().release();}
}

(2)Curator的可重入读写锁InterProcessReadWriteLock的初始化

读锁和写锁都是基于可重入锁InterProcessMutex的子类来实现的。读锁和写锁的获取锁和释放锁逻辑,就是使用InterProcessMutex的逻辑。

public class InterProcessReadWriteLock {private final InterProcessMutex readMutex;//读锁private final InterProcessMutex writeMutex;//写锁//must be the same length. LockInternals depends on itprivate static final String READ_LOCK_NAME  = "__READ__";private static final String WRITE_LOCK_NAME = "__WRIT__";...//InterProcessReadWriteLock的初始化public InterProcessReadWriteLock(CuratorFramework client, String basePath, byte[] lockData) {lockData = (lockData == null) ? null : Arrays.copyOf(lockData, lockData.length);//写锁的初始化writeMutex = new InternalInterProcessMutex(client,basePath,WRITE_LOCK_NAME,//写锁的lockName='__WRIT__'lockData,1,//写锁的maxLeasesnew SortingLockInternalsDriver() {@Overridepublic PredicateResults getsTheLock(CuratorFramework client, List<String> children, String sequenceNodeName, int maxLeases) throws Exception {return super.getsTheLock(client, children, sequenceNodeName, maxLeases);}});//读锁的初始化readMutex = new InternalInterProcessMutex(client,basePath,READ_LOCK_NAME,//读锁的lockName='__READ__'lockData,Integer.MAX_VALUE,//读锁的maxLeasesnew SortingLockInternalsDriver() {@Overridepublic PredicateResults getsTheLock(CuratorFramework client, List<String> children, String sequenceNodeName, int maxLeases) throws Exception {return readLockPredicate(children, sequenceNodeName);}});}private static class InternalInterProcessMutex extends InterProcessMutex {private final String lockName;private final byte[] lockData;InternalInterProcessMutex(CuratorFramework client, String path, String lockName, byte[] lockData, int maxLeases, LockInternalsDriver driver) {super(client, path, lockName, maxLeases, driver);this.lockName = lockName;this.lockData = lockData;}...}public InterProcessMutex readLock() {return readMutex;}public InterProcessMutex writeLock() {return writeMutex;}...
}

(3)InterProcessMutex获取锁的源码

public class InterProcessMutex implements InterProcessLock, Revocable<InterProcessMutex> {private final LockInternals internals;private final String basePath;private static final String LOCK_NAME = "lock-";//一个线程对应一个LockData数据对象private final ConcurrentMap<Thread, LockData> threadData = Maps.newConcurrentMap();...//初始化InterProcessMutexInterProcessMutex(CuratorFramework client, String path, String lockName, int maxLeases, LockInternalsDriver driver) {//设置锁的路径basePath = PathUtils.validatePath(path);//初始化LockInternalsinternals = new LockInternals(client, driver, path, lockName, maxLeases);}@Overridepublic void acquire() throws Exception {//获取分布式锁,会一直阻塞等待直到获取成功//相同的线程可以重入锁,每一次调用acquire()方法都要匹配一个release()方法的调用if (!internalLock(-1, null)) {throw new IOException("Lost connection while trying to acquire lock: " + basePath);}}private boolean internalLock(long time, TimeUnit unit) throws Exception {//获取当前线程Thread currentThread = Thread.currentThread();//获取当前线程对应的LockData数据LockData lockData = threadData.get(currentThread);if (lockData != null) {//可重入计算lockData.lockCount.incrementAndGet();return true;}//调用LockInternals.attemptLock()方法尝试获取锁,默认情况下,传入的time=-1,表示等待获取锁的时间String lockPath = internals.attemptLock(time, unit, getLockNodeBytes());if (lockPath != null) {//获取锁成功,将当前线程 + 其创建的临时顺序节点路径,封装成一个LockData对象LockData newLockData = new LockData(currentThread, lockPath);//然后把该LockData对象存放到InterProcessMutex.threadData这个Map中threadData.put(currentThread, newLockData);return true;}return false;}//LockData是InterProcessMutex的一个静态内部类private static class LockData {final Thread owningThread;final String lockPath;final AtomicInteger lockCount = new AtomicInteger(1);//用于锁的重入次数计数private LockData(Thread owningThread, String lockPath) {this.owningThread = owningThread;this.lockPath = lockPath;}}protected byte[] getLockNodeBytes() {return null;}...
}public class LockInternals {private final LockInternalsDriver driver;LockInternals(CuratorFramework client, LockInternalsDriver driver, String path, String lockName, int maxLeases) {this.driver = driver;this.path = ZKPaths.makePath(path, lockName);//生成要创建的临时节点路径名称...}...String attemptLock(long time, TimeUnit unit, byte[] lockNodeBytes) throws Exception {//获取当前时间final long startMillis = System.currentTimeMillis();//默认情况下millisToWait=nullfinal Long millisToWait = (unit != null) ? unit.toMillis(time) : null;//默认情况下localLockNodeBytes也是nullfinal byte[] localLockNodeBytes = (revocable.get() != null) ? new byte[0] : lockNodeBytes;int retryCount = 0;String ourPath = null;boolean hasTheLock = false;//是否已经获取到锁boolean isDone = false;//是否正在获取锁while (!isDone) {isDone = true;//1.这里是关键性的加锁代码,会去级联创建一个临时顺序节点ourPath = driver.createsTheLock(client, path, localLockNodeBytes);//2.检查是否获取到了锁hasTheLock = internalLockLoop(startMillis, millisToWait, ourPath);}if (hasTheLock) {return ourPath;}return null;}private final Watcher watcher = new Watcher() {@Overridepublic void process(WatchedEvent event) {//唤醒LockInternals中被wait()阻塞的线程client.postSafeNotify(LockInternals.this);}};//检查是否获取到了锁private boolean internalLockLoop(long startMillis, Long millisToWait, String ourPath) throws Exception {boolean haveTheLock = false;boolean doDelete = false;...while ((client.getState() == CuratorFrameworkState.STARTED) && !haveTheLock) {//3.获取排好序的各个客户端线程尝试获取分布式锁时创建的临时顺序节点名称列表List<String> children = getSortedChildren();//4.获取当前客户端线程尝试获取分布式锁时创建的临时顺序节点的名称String sequenceNodeName = ourPath.substring(basePath.length() + 1); // +1 to include the slash//5.获取当前线程创建的节点在节点列表中的位置 + 是否可以获取锁 + 前一个节点的路径名称PredicateResults predicateResults = driver.getsTheLock(client, children, sequenceNodeName, maxLeases);if (predicateResults.getsTheLock()) {//获取锁成功//返回truehaveTheLock = true;} else {//获取锁失败//获取前一个节点路径名称String previousSequencePath = basePath + "/" + predicateResults.getPathToWatch();synchronized(this) {//use getData() instead of exists() to avoid leaving unneeded watchers which is a type of resource leak//通过getData()获取前一个节点路径在zk的信息,并添加watch监听client.getData().usingWatcher(watcher).forPath(previousSequencePath);//默认情况下,millisToWait = nullif (millisToWait != null) {millisToWait -= (System.currentTimeMillis() - startMillis);startMillis = System.currentTimeMillis();if (millisToWait <= 0) {doDelete = true;//timed out - delete our nodebreak;}wait(millisToWait);//阻塞} else {wait();//阻塞}}}}...return haveTheLock;}List<String> getSortedChildren() throws Exception {//获取排好序的各个客户端线程尝试获取分布式锁时创建的临时顺序节点名称列表return getSortedChildren(client, basePath, lockName, driver);}public static List<String> getSortedChildren(CuratorFramework client, String basePath, final String lockName, final LockInternalsSorter sorter) throws Exception {//获取各个客户端线程尝试获取分布式锁时创建的临时顺序节点名称列表List<String> children = client.getChildren().forPath(basePath);//对节点名称进行排序List<String> sortedList = Lists.newArrayList(children);Collections.sort(sortedList,new Comparator<String>() {@Overridepublic int compare(String lhs, String rhs) {return sorter.fixForSorting(lhs, lockName).compareTo(sorter.fixForSorting(rhs, lockName));}});return sortedList;}...
}public class StandardLockInternalsDriver implements LockInternalsDriver {...//级联创建一个临时顺序节点@Overridepublic String createsTheLock(CuratorFramework client, String path, byte[] lockNodeBytes) throws Exception {String ourPath;//默认情况下传入的lockNodeBytes=nullif (lockNodeBytes != null) {ourPath = client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path, lockNodeBytes);} else {//创建临时顺序节点ourPath = client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path);}return ourPath;}//获取当前线程创建的节点在节点列表中的位置以及是否可以获取锁@Overridepublic PredicateResults getsTheLock(CuratorFramework client, List<String> children, String sequenceNodeName, int maxLeases) throws Exception {//根据节点名称获取当前线程创建的临时顺序节点在节点列表中的位置int ourIndex = children.indexOf(sequenceNodeName);validateOurIndex(sequenceNodeName, ourIndex);//maxLeases代表的是同时允许多少个客户端可以获取到锁//getsTheLock为true表示可以获取锁,getsTheLock为false表示获取锁失败boolean getsTheLock = ourIndex < maxLeases;//获取当前节点需要watch的前一个节点路径String pathToWatch = getsTheLock ? null : children.get(ourIndex - maxLeases);return new PredicateResults(pathToWatch, getsTheLock);}...
}

(4)先获取读锁 + 后获取读锁的情形分析

当线程创建完临时顺序节点,并获取到排好序的节点列表children后,执行LockInternalsDriver的getsTheLock()方法获取能否成功加锁的信息时,会执行到InterProcessReadWriteLock的readLockPredicate()方法。

 

由于此时firstWriteIndex = Integer.MAX_VALUE,所以无论多少线程尝试获取读锁,都能满足ourIndex < firstWriteIndex,也就是getsTheLock的值会为true,即表示可以获取读锁。

 

所以读读不互斥。

public class InterProcessReadWriteLock {...//sequenceNodeName是当前线程创建的临时顺序节点的路径名称private PredicateResults readLockPredicate(List<String> children, String sequenceNodeName) throws Exception {if (writeMutex.isOwnedByCurrentThread()) {return new PredicateResults(null, true);}int index = 0;int firstWriteIndex = Integer.MAX_VALUE;int ourIndex = -1;for (String node : children) {if (node.contains(WRITE_LOCK_NAME)) {firstWriteIndex = Math.min(index, firstWriteIndex);} else if (node.startsWith(sequenceNodeName)) {//找出当前线程创建的临时顺序节点在节点列表中的位置,用ourIndex表示ourIndex = index;break;}++index;}StandardLockInternalsDriver.validateOurIndex(sequenceNodeName, ourIndex);boolean getsTheLock = (ourIndex < firstWriteIndex);String pathToWatch = getsTheLock ? null : children.get(firstWriteIndex);return new PredicateResults(pathToWatch, getsTheLock);}...
}

(5)先获取读锁 + 后获取写锁的情形分析

一.假设客户端线程1首先成功获取了读锁

那么在/locks目录下,此时已经有了如下这个读锁的临时顺序节点。

/locks/43f3-4c2f-ba98-07a641d351f2-__READ__0000000004

二.然后另一个客户端线程2过来尝试获取写锁

于是该线程2会也会先在/locks目录下创建出如下写锁的临时顺序节点:

/locks/9361-4fb7-8420-a8d4911d2c99-__WRIT__0000000005

接着该线程会获取/locks目录的当前子节点列表并进行排序,结果如下:

[43f3-4c2f-ba98-07a641d351f2-__READ__0000000004,
9361-4fb7-8420-a8d4911d2c99-__WRIT__0000000005]

然后会执行StandardLockInternalsDriver的getsTheLock()方法。由于初始化写锁时,设置了其maxLeases是1,而在StandardLockInternalsDriver的getsTheLock()方法中,判断线程能成功获取写锁的依据是:ourIndex < maxLeases。即如果要成功获取写锁,那么线程创建的节点在子节点列表里必须排第一。

 

而此时,由于之前已有线程获取过一个读锁,而后来又有其他线程往里面创建一个写锁的临时顺序节点。所以写锁的临时顺序节点在子节点列表children里排第二,ourIndex是1。所以index = 1 < maxLeases = 1,条件不成立。

 

因此,此时客户端线程2获取写锁失败。于是该线程便会给前一个节点添加一个监听器,并调用wait()方法把自己挂起。如果前面一个节点被删除释放了锁,那么该线程就会被唤醒,从而再次尝试判断自己创建的节点是否在当前子节点列表中排第一。如果是,那么就表示获取写锁成功。

public class StandardLockInternalsDriver implements LockInternalsDriver {...//获取当前线程创建的节点在节点列表中的位置以及是否可以获取锁@Overridepublic PredicateResults getsTheLock(CuratorFramework client, List<String> children, String sequenceNodeName, int maxLeases) throws Exception {//根据节点名称获取当前线程创建的临时顺序节点在节点列表中的位置int ourIndex = children.indexOf(sequenceNodeName);validateOurIndex(sequenceNodeName, ourIndex);//maxLeases代表的是同时允许多少个客户端可以获取到锁//getsTheLock为true表示可以获取锁,getsTheLock为false表示获取锁失败boolean getsTheLock = ourIndex < maxLeases;//获取当前节点需要watch的前一个节点路径String pathToWatch = getsTheLock ? null : children.get(ourIndex - maxLeases);return new PredicateResults(pathToWatch, getsTheLock);}...
}

(6)先获取写锁 + 后获取读锁的情形分析

一.假设客户端线程1先获取了写锁

那么在/locks目录下,此时已经有了如下这个写锁的临时顺序节点。

/locks/4383-466e-9b86-fda522ea061a-__WRIT__0000000006

二.然后另一个客户端线程2过来尝试获取读锁

于是该线程2会也会先在/locks目录下创建出如下读锁的临时顺序节点:

/locks/5ba2-488f-93a4-f85fafd5cc32-__READ__0000000007

接着该线程会获取/locks目录的当前子节点列表并进行排序,结果如下:

[4383-466e-9b86-fda522ea061a-__WRIT__0000000006,
5ba2-488f-93a4-f85fafd5cc32-__READ__0000000007]

然后会执行LockInternalsDriver的getsTheLock()方法获取能否加锁的信息,也就是会执行InterProcessReadWriteLock的readLockPredicate()方法。

public class InterProcessReadWriteLock {...//sequenceNodeName是当前线程创建的临时顺序节点的路径名称private PredicateResults readLockPredicate(List<String> children, String sequenceNodeName) throws Exception {//如果是同一个客户端线程,先加写锁,再加读锁,是可以成功的,不会互斥if (writeMutex.isOwnedByCurrentThread()) {return new PredicateResults(null, true);}int index = 0;int firstWriteIndex = Integer.MAX_VALUE;int ourIndex = -1;for (String node : children) {if (node.contains(WRITE_LOCK_NAME)) {firstWriteIndex = Math.min(index, firstWriteIndex);} else if (node.startsWith(sequenceNodeName)) {//找出当前线程创建的临时顺序节点在节点列表中的位置,用ourIndex表示ourIndex = index;break;}++index;}StandardLockInternalsDriver.validateOurIndex(sequenceNodeName, ourIndex);boolean getsTheLock = (ourIndex < firstWriteIndex);String pathToWatch = getsTheLock ? null : children.get(firstWriteIndex);return new PredicateResults(pathToWatch, getsTheLock);}...
}

在InterProcessReadWriteLock的readLockPredicate()方法中,如果是同一个客户端线程,先获取写锁,再获取读锁,是不会互斥的。如果是不同的客户端线程,线程1先获取写锁,线程2再获取读锁,则互斥。因为线程2执行readLockPredicate()方法在遍历子节点列表(children)时,如果在子节点列表(children)中发现了一个写锁,会设置firstWriteIndex=0。而此时线程2创建的临时顺序节点的ourIndex=1,所以不满足ourIndex(1) < firstWriteIndex(0),于是线程2获取读锁失败。

 

总结,获取读锁时,在当前线程创建的节点前面:如果还有写锁对应的节点,那么firstWriteIndex就会被重置为具体位置。如果没有写锁对应的节点,那么firstWriteIndex就是MAX_VALUE。而只要firstWriteIndex为MAX_VALUE,那么就可以不断允许获取读锁。

 

(7)先获取写锁 + 再获取写锁的情形分析

如果客户端线程1先获取了写锁,然后后面客户端线程2来获取这个写锁。此时线程2会发现自己创建的节点排在节点列表中的第二,不是第一。于是获取写锁失败,进行阻塞挂起。等线程1释放了写锁后,才会唤醒线程2继续尝试获取写锁。

 

4.Curator的MultiLock源码

(1)Curator的MultiLock的使用

(2)Curator的MultiLock的源码

 

(1)Curator的MultiLock的使用

public class Demo {public static void main(String[] args) throws Exception {RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);final CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181",//zk的地址5000,//客户端和zk的心跳超时时间,超过该时间没心跳,Session就会被断开3000,//连接zk时的超时时间retryPolicy);client.start();System.out.println("已经启动Curator客户端,完成zk的连接");//MultiLockInterProcessLock lock1 = new InterProcessMutex(client, "/locks/lock_01");InterProcessLock lock2 = new InterProcessMutex(client, "/locks/lock_02");InterProcessLock lock3 = new InterProcessMutex(client, "/locks/lock_03");List<InterProcessLock> locks = new ArrayList<InterProcessLock>();locks.add(lock1);locks.add(lock2);locks.add(lock3);InterProcessMultiLock multiLock = new InterProcessMultiLock(locks);}
}

(2)Curator的MultiLock的源码

MultiLock原理:依次遍历获取每个锁,阻塞直到获取每个锁为止,然后返回true。如果过程中有报错,依次释放已经获取到的锁,然后返回false。

public class InterProcessMultiLock implements InterProcessLock {private final List<InterProcessLock> locks;public InterProcessMultiLock(List<InterProcessLock> locks) {this.locks = ImmutableList.copyOf(locks);}//获取锁@Overridepublic void acquire() throws Exception {acquire(-1, null);}@Overridepublic boolean acquire(long time, TimeUnit unit) throws Exception {Exception exception = null;List<InterProcessLock> acquired = Lists.newArrayList();boolean success = true;//依次遍历获取每个锁,阻塞直到获取每个锁为止for (InterProcessLock lock : locks) {try {if (unit == null) {lock.acquire();acquired.add(lock);} else  {if (lock.acquire(time, unit)) {acquired.add(lock);} else {success = false;break;}}} catch (Exception e) {ThreadUtils.checkInterrupted(e);success = false;exception = e;}}if (!success) {for (InterProcessLock lock : reverse(acquired)) {try {lock.release();} catch (Exception e) {ThreadUtils.checkInterrupted(e);// ignore}}}if (exception != null) {throw exception;}return success;}@Overridepublic synchronized void release() throws Exception {Exception baseException = null;for (InterProcessLock lock : reverse(locks)) {try {lock.release();} catch (Exception e) {ThreadUtils.checkInterrupted(e);if (baseException == null) {baseException = e;} else {baseException = new Exception(baseException);}}}if (baseException != null) {throw baseException;}}...
}

 

5.Curator的Semaphore源码

(1)基于InterProcessSemaphoreV2使用Semaphore

(2)InterProcessSemaphoreV2的初始化

(3)InterProcessSemaphoreV2.acquire()方法获取Semaphore的Lease

(4)InterProcessSemaphoreV2.returnLease()方法释放Semaphore的Lease

 

Semaphore信号量,就是指定同时可以有多个线程获取到锁。

 

(1)基于InterProcessSemaphoreV2使用Semaphore

public class Demo {public static void main(String[] args) throws Exception {RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);final CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181",//zk的地址5000,//客户端和zk的心跳超时时间,超过该时间没心跳,Session就会被断开3000,//连接zk时的超时时间retryPolicy);client.start();System.out.println("已经启动Curator客户端,完成zk的连接");//获取SemaphoreInterProcessSemaphoreV2 semaphore = new InterProcessSemaphoreV2(client, "/semaphore", 3);Lease lease = semaphore.acquire();//获取Semaphore的一个锁Thread.sleep(3000);semaphore.returnLease(lease);//向Semaphore返还一个锁}
}

(2)InterProcessSemaphoreV2的初始化

public class InterProcessSemaphoreV2 {private final WatcherRemoveCuratorFramework client;private final InterProcessMutex lock;private final String leasesPath;private volatile int maxLeases;...//maxLeases表示该实例可以允许获取的lease数量public InterProcessSemaphoreV2(CuratorFramework client, String path, int maxLeases) {this(client, path, maxLeases, null);}//初始化InterProcessSemaphoreV2时,传入的参数path = "/semaphore",参数maxLeases = 3private InterProcessSemaphoreV2(CuratorFramework client, String path, int maxLeases, SharedCountReader count) {this.client = client.newWatcherRemoveCuratorFramework();path = PathUtils.validatePath(path);//锁的path是ZKPaths.makePath(path, LOCK_PARENT) => '/semaphore/locks'//初始化一个InterProcessMutex分布式锁this.lock = new InterProcessMutex(client, ZKPaths.makePath(path, LOCK_PARENT));this.maxLeases = (count != null) ? count.getCount() : maxLeases;//lease的path是:'/semaphore/leases'this.leasesPath = ZKPaths.makePath(path, LEASE_PARENT);...}...
}

(3)InterProcessSemaphoreV2.acquire()方法获取Semaphore的Lease

客户端线程尝试获取Semaphore的一个Lease。

 

步骤一:首先会获取初始化时创建的锁InterProcessMutex

锁的路径是:/semaphore/locks。当多个客户端线程同时执行acquire()获取Lease时只会有一个线程成功,而其他线程会基于锁路径下的临时顺序节点来排队获取锁。

 

步骤二:获取锁成功后才会尝试获取Semaphore的Lease

Lease的路径是:/semaphore/leases。此时会先到'/semaphore/leases'目录下创建一个临时顺序节点,然后会调用InterProcessSemaphoreV2的makeLease()方法创建一个Lease。这个Lease对象就是客户端线程成功获取Semaphore的一个Lease。

 

创建完Lease对象后,接着会进入一个for循环,会先获取/semaphore/leases目录下的所有临时顺序节点,并添加监听。然后判断/semaphore/leases目录下节点的数量是否大于maxLeases。如果临时顺序节点的数量小于maxLeases,那么说明当前客户端线程成功获取Semaphore的Lease,于是退出循环。如果临时顺序节点的数量大于maxLeases,那么当前客户端线程就要调用wait()进行阻塞等待。

public class InterProcessSemaphoreV2 {private final InterProcessMutex lock;private final Watcher watcher = new Watcher() {@Overridepublic void process(WatchedEvent event) {//唤醒在InterProcessSemaphoreV2对象中执行wait()而被阻塞的线程client.postSafeNotify(InterProcessSemaphoreV2.this);}};...public Lease acquire() throws Exception {Collection<Lease> leases = acquire(1, 0, null);return leases.iterator().next();}public Collection<Lease> acquire(int qty, long time, TimeUnit unit) throws Exception {long startMs = System.currentTimeMillis();boolean hasWait = (unit != null);long waitMs = hasWait ? TimeUnit.MILLISECONDS.convert(time, unit) : 0;Preconditions.checkArgument(qty > 0, "qty cannot be 0");ImmutableList.Builder<Lease> builder = ImmutableList.builder();boolean success = false;try {while (qty-- > 0) {int retryCount = 0;long startMillis = System.currentTimeMillis();boolean isDone = false;while (!isDone) {switch (internalAcquire1Lease(builder, startMs, hasWait, waitMs)) {case CONTINUE: {isDone = true;break;}case RETURN_NULL: {return null;}case RETRY_DUE_TO_MISSING_NODE: {if (!client.getZookeeperClient().getRetryPolicy().allowRetry(retryCount++, System.currentTimeMillis() - startMillis, RetryLoop.getDefaultRetrySleeper())) {throw new KeeperException.NoNodeException("Sequential path not found - possible session loss");}//try againbreak;}}}}success = true;} finally {if (!success) {returnAll(builder.build());}}return builder.build();}private InternalAcquireResult internalAcquire1Lease(ImmutableList.Builder<Lease> builder, long startMs, boolean hasWait, long waitMs) throws Exception {if (client.getState() != CuratorFrameworkState.STARTED) {return InternalAcquireResult.RETURN_NULL;}if (hasWait) {long thisWaitMs = getThisWaitMs(startMs, waitMs);if (!lock.acquire(thisWaitMs, TimeUnit.MILLISECONDS)) {return InternalAcquireResult.RETURN_NULL;}} else {//1.首先获取一个分布式锁lock.acquire();}Lease lease = null;boolean success = false;try {//2.尝试获取Semaphore的Lease:创建一个临时顺序节点PathAndBytesable<String> createBuilder = client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL);String path = (nodeData != null) ? createBuilder.forPath(ZKPaths.makePath(leasesPath, LEASE_BASE_NAME), nodeData) : createBuilder.forPath(ZKPaths.makePath(leasesPath, LEASE_BASE_NAME));String nodeName = ZKPaths.getNodeFromPath(path);lease = makeLease(path);...try {synchronized(this) {for(;;) {List<String> children;//3.获取./lease目录下的所有临时顺序节点,并添加watcher监听children = client.getChildren().usingWatcher(watcher).forPath(leasesPath);...//4.判断临时顺序节点的数量是否大于maxLeases//maxLeases表示最多允许多少个客户端线程获取Semaphore的Leaseif (children.size() <= maxLeases) {//如果临时顺序节点的数量小于maxLeases//那么说明当前客户端线程成功获取Semaphore的Lease,于是退出循环break;}//如果临时顺序节点的数量大于maxLeases//那么当前客户端线程就要调用wait()进行阻塞等待if (hasWait) {long thisWaitMs = getThisWaitMs(startMs, waitMs);if (thisWaitMs <= 0) {return InternalAcquireResult.RETURN_NULL;}...wait(thisWaitMs);} else {...wait();}}success = true;}} finally {if (!success) {returnLease(lease);}client.removeWatchers();}} finally {//释放掉之前获取的锁lock.release();}builder.add(Preconditions.checkNotNull(lease));return InternalAcquireResult.CONTINUE;}...
}

(4)InterProcessSemaphoreV2.returnLease()方法释放Semaphore的Lease

执行InterProcessSemaphoreV2的returnLease()方法时,最终会执行makeLease()生成的Lease对象的close()方法,而close()方法会删除在/semaphore/leases目录下创建的临时顺序节点。

 

当/semaphore/leases目录下的节点发生变化时,那些对该目录进行Watcher监听的客户端就会收到通知,于是就会执行Watcher里的process()方法,唤醒执行wait()时被阻塞的线程,从而让这些没有成功获取Semaphore的Lease的线程继续尝试获取Lease。

public class InterProcessSemaphoreV2 {...public void returnLease(Lease lease) {//执行Lease的close()方法CloseableUtils.closeQuietly(lease);}private Lease makeLease(final String path) {return new Lease() {@Overridepublic void close() throws IOException {try {client.delete().guaranteed().forPath(path);} catch (KeeperException.NoNodeException e) {log.warn("Lease already released", e);} catch (Exception e) {ThreadUtils.checkInterrupted(e);throw new IOException(e);}}@Overridepublic byte[] getData() throws Exception {return client.getData().forPath(path);}@Overridepublic String getNodeName() {return ZKPaths.getNodeFromPath(path);}};}...
}

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/895696.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Vue2后台管理学习笔记

1 、调试环境 1.1 NVM与NPM区别 NVM 是一个轻量级的工具,通过修改环境变量 PATH 来实现版本切换。它与 npm(Node.js 包管理工具)不同,npm 主要用于管理 Node.js 的包,而 NVM 专注于管理 Node.js 的版本。 1.2 NVM管理多个 Node.js 版本 NVM(Node Version Manager) 是一个…

Ubuntu系统安装Minikube教程

说明 最近在看《Quick Start Kubernetes》,书上使用的是 Docker Desktop 和 Linode Kubernetes Engine(LKE) 进行演示,但是实际开发中都不可能用到这两个工具,所以本人使用 minikube 进行操作。 系统:WSL Ubuntu 22.0。 安装 curl -LO https://github.com/kubernetes/minik…

Transformer 笔记 (CS224N-7)

模型回顾 问题:RNN需要经过k步才能对远距离的单词进行交互,例如这里的was是chef的谓语,二者的关系十分紧密,但是使用线性顺序分析句子会导致如果was和chef的距离较远,它们会难以交互(因为梯度问题) Self Attention 键值对注意力 (1)我们可以将注意力视为在键值存储中执行…

Easyexcel(7-自定义样式)

EasyExcel 提供简便的自定义样式功能,用户可以轻松定制Excel表格样式。通过Style类,可以设置字体、边框、背景颜色等属性,同时支持单元格合并和格式化操作。通过配置WriteHandler,可以实现更多样式定制,满足不同业务需求,让数据展示更直观。注解 @ContentStyle 用于设置内…

预训练笔记 (CS224N-8)

子词模型 前置知识 (1)词法知识介绍语音学是音流无争议的物理学 语音体系假定了一组或多组独特的、分类的单元(音素)传统上,词素是最小的语义单位(例如\(ate,ly,able\)这种),但如今我们需要讨论比单词粒度更细的模型以处理大量的开放词汇(巨大的、无限的单词空间) 例如…

团队项目:新建文件夹(1)团队成员介绍

项目 内容这个作业属于哪个课程 2025年春季软件工程(罗杰、任健)这个作业的要求在哪里 [T.1] 团队项目:团队成员介绍我在这个课程的目标是 培养团队合作精神,开发功能完善的软件这个作业在哪个具体方面帮助我实现目标 团队协作,软件开发理论基础团队介绍 新建文件夹(1)队由…

LSTM 和机器翻译 (CS224N-5)

LSTM (1)LSTM(Long Short-Term Memory RNNs)是Hochreiter和Schmidhuber在1997年提出的一种RNN,用于解决消失梯度问题 (2)在步骤t中,这里有一个隐藏单元\(h^{(t)}\)和一个记忆单元\(c^{(t)}\)它们均为长度为n的向量记忆单元用来存储长期信息LSTM可以从记忆单元中读取、消除和…

解码和 Attention 机制笔记 (CS224N-6)

语言模型的解码△:在讲义中这部分内容是神经机器翻译NMT中,我认为这个适用于所有语言模型所以就单列出来了(1)贪心解码:每一步都取最可能的单词,用前一步的输出作为下一步的输入(存在问题:有一个预测偏差,会导致后面的生成不可挽回,最终结果不尽人意) (2)穷举解码:顾…

语言模型和 RNN 笔记 (CS224N-4)

语言模型定义 (1)语言模型任务是根据给定的单词序列计算下一个单词的概率分布,完成这样的任务的系统就被称作语言模型 (2)也可以认为语言模型是一个为文本分配概率的系统,例如文本 \(x^{(1)},\cdots,x^{(T)}\) 的概率是n-gram语言模型 (1)一个n-gram是由n个连续单词组成的一块…

Java 线程池 ThreadPoolExecutor 的状态控制变量 ctrl

如下是源代码。线程池的主要控制状态 ctl 是一个原子整数,它打包了两个概念字段:workerCount:表示当前有效运行的线程数。 runState:表示线程池的状态(如是否正在运行、关闭等)。为了将这两个字段打包成一个 int,我们将 workerCount 限制为 (2^{29} - 1)(约5亿),而不…

go 方法和函数的区别

图片中的内容总结了Go语言中方法和函数的区别,主要分为三个方面:调用方式不同:函数的调用方式:函数名(实参列表) 方法的调用方式:变量.方法名(实参列表)普通函数的接收者类型限制:对于普通函数,如果接收者是值类型,则不能将指针类型的数据直接传递给它;反之亦然。方法…

德里克昆什肯《量子战争》目录

机翻未校对是机翻的,而且没有经过校对,只能作为SFW编辑部的懒狗开工前的替代品 第一章 第二章 第三章 Chap 4 Chap 5~6 Chap 7~8 Chap 9~10 Chap 11~14 Chap 15~18 Chap 19~22 Chap 23~26 Chap 27~30 Chap 31~34 Chap 35~38 Chap 39~42 Chap 43~46 Chap 47~50 Chap 51~61本文…