Netty源码—10.Netty工具之时间轮

news/2025/4/1 20:30:14/文章来源:https://www.cnblogs.com/mjunz/p/18801239

大纲

1.什么是时间轮

2.HashedWheelTimer是什么

3.HashedWheelTimer的使用

4.HashedWheelTimer的运行流程

5.HashedWheelTimer的核心字段

6.HashedWheelTimer的构造方法

7.HashedWheelTimer添加任务和执行任务

8.HashedWheelTimer的完整源码

9.HashedWheelTimer的总结

10.HashedWheelTimer的应用

 

1.什么是时间轮

简单来说,时间轮是一个高效利用线程资源进行批量化调度的调度器。首先把大批量的调度任务全部绑定到同一个调度器上,然后使用这个调度器对所有任务进行管理、触发、以及运行,所以时间轮能高效管理各种延时任务、周期任务、通知任务。

 

时间轮是以时间作为刻度组成的一个环形队列,所以叫做时间轮。这个环形队列通过一个HashedWheelBucket[]数组来实现,数组的每个元素称为槽,每个槽可以存放一个定时任务列表,叫HashedWheelBucket。HashedWheelBucket是一个双向链表,链表的每个节点表示一个定时任务项HashedWheelTimeout。在HashedWheelTimeout中封装了真正的定时任务TimerTask。

 

时间轮由多个时间格组成,每个时间格代表当前时间轮的基本时间跨度ticketDuration,其中时间轮的时间格的个数是固定的。

 

如下图示,有16个时间格(槽),假设每个时间格的单位是1s,那么整个时间轮走完一圈需要16s。每秒钟(即时间格的单位也可以为1ms、1min、1h等)指针会沿着顺时针方向转动一格。通过指针移动来获得每个时间格中的任务列表,然后遍历这个时间格内的双向链表的每个任务并执行,依此循环。

 

2.HashedWheelTimer是什么

Netty的HashedWheelTimer是一个粗略的定时器实现,之所以称为粗略的实现是因为该时间轮并没有严格准时地执行定时任务,而是在每隔一个时间间隔之后的时间节点执行,并执行当前时间节点之前到期的定时任务。

 

不过具体的定时任务的时间执行精度,可以通过调节HashedWheelTimer构造方法的时间间隔的大小来进行调节。在大多数网络应用的情况下,由于IO延迟的存在,所以并不会严格要求具体的时间执行精度。因此默认100ms的时间间隔可以满足大多数情况,不需要再花精力去调节该时间精度。

 

3.HashedWheelTimer的使用

public class HashedWheelTimerTest {//构建HashedWheelTimer时间轮//最后通过HASHED_WHEEL_TIMER.newTimeout()方法把需要延迟执行的任务添加到时间轮中private static final HashedWheelTimer HASHED_WHEEL_TIMER = new HashedWheelTimer(new DefaultThreadFactory("demo-timer"),//threadFactory参数表示创建处理任务的线程工厂100,//tickDuration参数表示每个时间格代表当前时间轮的基本时间跨度,这里是100ms,也就是指针100ms跳动一次,每次跳动一个窗格 TimeUnit.MILLISECONDS,512,//ticksPerWheel参数表示时间轮上一共有多少个时间格,分配的时间格越多,占用内存空间就越大,这里是512true//leakDetection参数表示是否开启内存泄漏检测);public static void main(String[] args) {System.out.println("延时任务提交");//延时多久执行long delay = 10L;HASHED_WHEEL_TIMER.newTimeout(new TimerTask() {@Overridepublic void run(Timeout timeout) throws Exception {System.out.println("延时任务触发");}}, delay, TimeUnit.SECONDS);}
}

4.HashedWheelTimer的运行流程

步骤一:初始化时间轮

步骤二:启动时间轮

步骤三:添加任务,保存到延时任务队列

步骤四:时间轮指针休眠阻塞,实现转动

步骤五:休眠结束,指针指向下一个时间格(槽)

步骤六:将已经取消的任务从对应的槽中移除

步骤七:将延时任务队列的任务添加到对应的槽中

步骤八:执行时间轮指针指向当前槽的到期任务

 

5.HashedWheelTimer中的关键字段

字段一:wheel

wheel是一个HashedWheelBucket数组,默认的数组大小是512。可以认为wheel是一个TimerTask的哈希表,它的哈希函数是任务的截止日期。所以每个时间轮的时间格数ticksPerWheel默认是512。

 

字段二:tickDuration

时间格跨度,默认100ms。

 

字段三:ticksPerWheel

时间轮的格子数,默认512。

 

字段四:maxPendingTimeouts

时间轮中任务的最大数量。

 

字段五:deadline

延时任务的截止时间,值为当前时间 + 延时任务的延时时间 - 时间轮启动时间。

 

字段六:tick

时间轮启动以来指针总的转动次数。

 

字段七:remainingRounds

槽中延时任务剩余的圈(轮)数,为0时则表示需要执行延时任务了。

 

6.HashedWheelTimer的构造方法

步骤一:构造参数校验及给实际执行延时任务的线程池taskExecutor赋值

步骤二:将ticksPerWheel(时间轮上的时间格数)向上取值为2的幂

步骤三:初始化HashedWheelBucket数组wheel

步骤四:校验tickDuration和ticksPerWheel

步骤五:创建工作线程workerThread,用于指针转动和触发执行时间格里的延时任务

步骤六:给时间轮中延时任务的最大数量maxPendingTimeouts赋值

步骤七:检查HashedWheelTimer的实例数量,如果大于64则打印error日志

//4.1.73.Final
public class HashedWheelTimer implements Timer {private final HashedWheelBucket[] wheel;private final int mask;private final long tickDuration;private final Thread workerThread;private final ResourceLeakTracker<HashedWheelTimer> leak;private final Worker worker = new Worker();private final long maxPendingTimeouts;private final Executor taskExecutor;...//Creates a new timer.//@param threadFactory        创建线程的工厂//@param tickDuration         每格的时间间隔,默认100ms,0.1秒//@param unit                 时间单位,默认为毫秒//@param ticksPerWheel        时间轮的格子数,默认为512;如果传入的不是2的N次方,则会调整为大于等于该参数的第一个2的N次方,好处是可以优化hash值的计算 //@param leakDetection        如果false,那么只有工作线程不是后台线程时才会追踪资源泄露,这个参数可以忽略//@param maxPendingTimeouts   最大的pending数量(时间轮中任务的最大数量),超过这个值之后调用将抛出异常,0或者负数表示没有限制,默认为-1//@param taskExecutor         任务线程池,用于执行提交的任务,调用者负责在不需要时关闭它//@throws NullPointerException     if either of threadFactory and unit is null//@throws IllegalArgumentException if either of tickDuration and ticksPerWheel is <= 0public HashedWheelTimer(ThreadFactory threadFactory, long tickDuration, TimeUnit unit, int ticksPerWheel, boolean leakDetection, long maxPendingTimeouts, Executor taskExecutor) { //1.构造参数校验及给实际执行延时任务的线程池taskExecutor赋值checkNotNull(threadFactory, "threadFactory");checkNotNull(unit, "unit");checkPositive(tickDuration, "tickDuration");checkPositive(ticksPerWheel, "ticksPerWheel");this.taskExecutor = checkNotNull(taskExecutor, "taskExecutor");//2.将ticksPerWheel(时间轮上的时间格数)向上取值为2的幂,方便进行求商和取余计算//3.初始化时间轮wheelwheel = createWheel(ticksPerWheel);//mask的设计和HashMap一样,通过限制数组的大小为2的幂,利用位运算来替代取模运算,提高性能mask = wheel.length - 1;//4.校验tickDuration和ticksPerWheel//Convert tickDuration to nanos.long duration = unit.toNanos(tickDuration);//防止溢出//tickDuration * ticksPerWheel必须小于Long.MAX_VALUEif (duration >= Long.MAX_VALUE / wheel.length) {throw new IllegalArgumentException(String.format("tickDuration: %d (expected: 0 < tickDuration in nanos < %d", tickDuration, Long.MAX_VALUE / wheel.length));}//tickDuration不能小于1msif (duration < MILLISECOND_NANOS) {logger.warn("Configured tickDuration {} smaller then {}, using 1ms.", tickDuration, MILLISECOND_NANOS);this.tickDuration = MILLISECOND_NANOS;} else {this.tickDuration = duration;}//5.创建工作线程,用于指针转动和触发时间格里的延时任务的执行workerThread = threadFactory.newThread(worker);leak = leakDetection || !workerThread.isDaemon() ? leakDetector.track(this) : null;//6.给时间轮中任务的最大数量maxPendingTimeouts赋值this.maxPendingTimeouts = maxPendingTimeouts;//7.检查HashedWheelTimer的实例数量,如果大于64则打印error日志if (INSTANCE_COUNTER.incrementAndGet() > INSTANCE_COUNT_LIMIT &&WARNED_TOO_MANY_INSTANCES.compareAndSet(false, true)) {reportTooManyInstances();}}//初始化时间轮环形数组//@param ticksPerWheelprivate static HashedWheelBucket[] createWheel(int ticksPerWheel) {//ticksPerWheel不能大于2^30checkInRange(ticksPerWheel, 1, 1073741824, "ticksPerWheel");//将ticksPerWheel(轮子上的时间格数)向上取值为2的次幂ticksPerWheel = normalizeTicksPerWheel(ticksPerWheel);//创建时间轮环形数组HashedWheelBucket[] wheel = new HashedWheelBucket[ticksPerWheel];for (int i = 0; i < wheel.length; i ++) {wheel[i] = new HashedWheelBucket();}return wheel;}...
}

 

7.HashedWheelTimer添加任务和执行任务

(1)添加延时任务

(2)执行延时任务

 

(1)添加延时任务

步骤一:将需要执行的延时任务数pendingTimeouts+1

步骤二:如果pendingTimeouts超过maxPendingTimeouts,则抛出异常

步骤三:启动工作线程,也就是启动时间轮

步骤四:计算被添加的延时任务的截止时间=当前时间+当前任务执行的延迟时间-时间轮启动的时间

步骤五:创建延时任务实例HashedWheelTimeout

步骤六:将延时任务实例添加到延时任务队列timeouts中

注意:添加时会将延时任务添加到延时任务队列timeouts中。这个延时任务队列timeouts将会在下一个滴答声中进行处理(指针的下一次转动)。在处理过程中,所有排队的HashedWheelTimeout将被添加到正确的HashedWheelBucket。

public class HashedWheelTimer implements Timer {private final AtomicLong pendingTimeouts = new AtomicLong(0);//需要执行的延时任务数private final long maxPendingTimeouts;private volatile long startTime;private final CountDownLatch startTimeInitialized = new CountDownLatch(1);private final Queue<HashedWheelTimeout> timeouts = PlatformDependent.newMpscQueue();//延时任务队列...//添加延时任务//@param task 任务//@param delay 延时时间//@param unit 延时时间单位@Overridepublic Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {checkNotNull(task, "task");checkNotNull(unit, "unit");//1.将需要执行的延时任务数pendingTimeouts + 1long pendingTimeoutsCount = pendingTimeouts.incrementAndGet();//2.如果pendingTimeouts超过maxPendingTimeouts,则抛出异常if (maxPendingTimeouts > 0 && pendingTimeoutsCount > maxPendingTimeouts) {pendingTimeouts.decrementAndGet();throw new RejectedExecutionException("Number of pending timeouts (" + pendingTimeoutsCount + ") is greater than or equal to maximum allowed pending " + "timeouts (" + maxPendingTimeouts + ")"); }//3.启动工作线程,即启动时间轮start();//将延时任务添加到延时任务队列timeouts中,该队列将在下一个滴答声中处理(指针的下一次转动)//在处理过程中,所有排队的HashedWheelTimeout将被添加到正确的HashedWheelBucket//4.计算任务的截止时间deadline = 当前时间 + 当前任务执行的延迟时间 - 时间轮启动的时间long deadline = System.nanoTime() + unit.toNanos(delay) - startTime;if (delay > 0 && deadline < 0) {deadline = Long.MAX_VALUE;}//5.创建延时任务实例HashedWheelTimeoutHashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline);//6.将延时任务实例添加到延时任务队列中timeouts.add(timeout);return timeout;}//Starts the background thread explicitly.  //The background thread will start automatically on demand even if you did not call this method.//@throws IllegalStateException if this timer has been #stop() stopped alreadypublic void start() {switch (WORKER_STATE_UPDATER.get(this)) {case WORKER_STATE_INIT:if (WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_INIT, WORKER_STATE_STARTED)) {//启动工作线程,即启动时间轮workerThread.start();}break;case WORKER_STATE_STARTED:break;case WORKER_STATE_SHUTDOWN:throw new IllegalStateException("cannot be started once stopped");default:throw new Error("Invalid WorkerState");}//Wait until the startTime is initialized by the worker.while (startTime == 0) {try {//阻塞时间轮的工作线程startTimeInitialized.await();} catch (InterruptedException ignore) {//Ignore - it will be ready very soon.}}}...
}

(2)执行延时任务

步骤一:记录时间轮启动的时间startTime

步骤二:开始do while循环,唤醒被阻塞的start()方法,通知时间轮已经启动完毕

步骤三:阻塞等待下一次指针转动的时间

步骤四:计算当前指针指向的时间轮的槽位idx

步骤五:将已经取消的任务从HashedWheelBucket数组中移除,并将pendingTimeouts任务数 - 1

步骤六:获取当前指针指向的时间槽HashedWheelBucket

步骤七:遍历延时任务队列timeouts,将其中的延时任务保存到对应的槽的链表中,根据延时时间计算对应的时间槽和remainingRounds圈数

步骤八:运行目前指针指向的时间槽中的链表的任务,通过taskExecutor线程池去执行到期的任务

步骤九:到期的和取消的延时任务从链表中移除并将pendingTimeouts--

步骤十:时间轮指针的总转动次数tick++,继续do while循环

步骤十一:清除时间轮中不需要处理的任务,保存到unprocessedTimeouts中

步骤十二:将延时任务队列中还未添加到时间轮的延时任务保存到unprocessedTimeouts中

步骤十三:将已经取消的任务从HashedWheelBucket数组中移除,并将pendingTimeouts任务数 - 1

public class HashedWheelTimer implements Timer {private volatile long startTime;private final CountDownLatch startTimeInitialized = new CountDownLatch(1);...//指针转动和执行延时任务的线程private final class Worker implements Runnable {//用于记录未执行的延时任务private final Set<Timeout> unprocessedTimeouts = new HashSet<Timeout>();//总的tick数(指针嘀嗒的次数)private long tick;@Overridepublic void run() {//1.记录时间轮启动的时间startTimestartTime = System.nanoTime();if (startTime == 0) {//我们在这里使用0作为未初始化值的指示符,所以要确保初始化时它不是0startTime = 1;}//2.唤醒被阻塞的start()方法,通知时间轮已经启动完毕startTimeInitialized.countDown();//一直执行do while循环,直到时间轮被关闭do {//3.阻塞等待下一次指针转动的时间//这里会休眠tick的时间,模拟指针走动final long deadline = waitForNextTick();if (deadline > 0) {//4.计算当前指针指向的时间轮槽位idxint idx = (int) (tick & mask);//5.将已经取消的任务从HashedWheelBucket数组中移除,并将pendingTimeouts任务数 - 1processCancelledTasks();//6.获取当前指针指向的时间槽HashedWheelBucketHashedWheelBucket bucket = wheel[idx];//7.遍历延时任务队列timeouts,将其中的延时任务保存到对应的槽的链表中transferTimeoutsToBuckets();//8.运行目前指针指向的槽中的链表的任务,交给taskExecutor线程池去执行到期的延时任务//9.到期的和取消的延时任务从链表中移除并将pendingTimeouts--bucket.expireTimeouts(deadline);//10.时间轮指针的总转动次数tick++tick++;}} while (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_STARTED);//Fill the unprocessedTimeouts so we can return them from stop() method.//11.清除时间轮中不需要处理的任务for (HashedWheelBucket bucket: wheel) {bucket.clearTimeouts(unprocessedTimeouts);}//12.将延时任务队列中还未添加到时间轮的延时任务保存到unprocessedTimeouts中//遍历任务队列,如果发现有任务被取消,则添加到unprocessedTimeouts,也就是不需要处理的队列中for (;;) {HashedWheelTimeout timeout = timeouts.poll();if (timeout == null) {break;}if (!timeout.isCancelled()) {//如果延时任务没被取消,记录到未执行的任务Set集合中unprocessedTimeouts.add(timeout);}}//13.处理被取消的任务processCancelledTasks();}//将延时任务队列timeouts中等待添加到时间轮中的延时任务,转移到时间轮的指定位置//也就是遍历延时任务队列timeouts,将其中的延时任务保存到对应的槽的链表中private void transferTimeoutsToBuckets() {//每次转移10w个延时任务for (int i = 0; i < 100000; i++) {//从队列中出队一个延时任务HashedWheelTimeout timeout = timeouts.poll();if (timeout == null) {//all processedbreak;}if (timeout.state() == HashedWheelTimeout.ST_CANCELLED) {//Was cancelled in the meantime.continue;}//到期一共需要走多少时间格(tick次数),deadline表示当前任务的延迟时间(从时间轮启动时计算),tickDuration表示时间格的时间间隔 long calculated = timeout.deadline / tickDuration;//tick已经走了的时间格,到期一共还需要需要走多少圈timeout.remainingRounds = (calculated - tick) / wheel.length;//如果延时任务在队列中等待太久已经过了执行时间,那么这个时候就使用当前tick,也就是放在当前的bucket,此方法调用完后就会被执行final long ticks = Math.max(calculated, tick);//槽的索引,stopIndex = tick 次数 & mask, mask = wheel.length - 1int stopIndex = (int) (ticks & mask);//根据索引该任务应该放到的槽HashedWheelBucket bucket = wheel[stopIndex];//将任务添加到槽中,链表末尾bucket.addTimeout(timeout);}}//处理取消掉的延时任务//将已经取消的任务从HashedWheelBucket数组中移除,并将pendingTimeouts任务数 - 1private void processCancelledTasks() {for (;;) {HashedWheelTimeout timeout = cancelledTimeouts.poll();if (timeout == null) {//all processedbreak;}try {timeout.remove();} catch (Throwable t) {if (logger.isWarnEnabled()) {logger.warn("An exception was thrown while process a cancellation task", t);}}}}//从时间轮的启动时间startTime和当前的tick数(指针跳动次数)计算下一次指针跳动的时间,然后休眠等待下一次指针跳动时间到来private long waitForNextTick() {//deadline返回的是下一次时间轮指针跳动的时间与时间格启动的时间间隔long deadline = tickDuration * (tick + 1);for (;;) {//计算当前时间距离启动时间的时间间隔final long currentTime = System.nanoTime() - startTime;//距离下一次指针跳动还需休眠多长时间long sleepTimeMs = (deadline - currentTime + 999999) / 1000000;//到了指针调到下一个槽位的时间if (sleepTimeMs <= 0) {if (currentTime == Long.MIN_VALUE) {return -Long.MAX_VALUE;} else {return currentTime;}}try {//表示距离下一次指针跳动还需要一段时间,所以休眠等待时间的到来Thread.sleep(sleepTimeMs);} catch (InterruptedException ignored) {if (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_SHUTDOWN) {return Long.MIN_VALUE;}}}}//记录未执行的延时任务public Set<Timeout> unprocessedTimeouts() {return Collections.unmodifiableSet(unprocessedTimeouts);}}private static final class HashedWheelBucket {...public void expireTimeouts(long deadline) {HashedWheelTimeout timeout = head;//process all timeoutswhile (timeout != null) {HashedWheelTimeout next = timeout.next;if (timeout.remainingRounds <= 0) {next = remove(timeout);if (timeout.deadline <= deadline) {//通过线程池执行任务timeout.expire();} else {//The timeout was placed into a wrong slot. This should never happen.throw new IllegalStateException(String.format("timeout.deadline (%d) > deadline (%d)", timeout.deadline, deadline)); }} else if (timeout.isCancelled()) {next = remove(timeout);} else {timeout.remainingRounds --;}timeout = next;}}public HashedWheelTimeout remove(HashedWheelTimeout timeout) {HashedWheelTimeout next = timeout.next;//remove timeout that was either processed or cancelled by updating the linked-listif (timeout.prev != null) {timeout.prev.next = next;}if (timeout.next != null) {timeout.next.prev = timeout.prev;}if (timeout == head) {//if timeout is also the tail we need to adjust the entry tooif (timeout == tail) {tail = null;head = null;} else {head = next;}} else if (timeout == tail) {//if the timeout is the tail modify the tail to be the prev node.tail = timeout.prev;}//null out prev, next and bucket to allow for GC.timeout.prev = null;timeout.next = null;timeout.bucket = null;timeout.timer.pendingTimeouts.decrementAndGet();return next;}...    }private static final class HashedWheelTimeout implements Timeout, Runnable {private final TimerTask task;private final HashedWheelTimer timer;...public void expire() {if (!compareAndSetState(ST_INIT, ST_EXPIRED)) {return;}try {timer.taskExecutor.execute(this);} catch (Throwable t) {if (logger.isWarnEnabled()) {logger.warn("An exception was thrown while submit " + TimerTask.class.getSimpleName() + " for execution.", t);}}}...}...
}

 

8.HashedWheelTimer的完整源码

//Netty时间轮
public class HashedWheelTimer implements Timer {static final InternalLogger logger = InternalLoggerFactory.getInstance(HashedWheelTimer.class);private static final AtomicInteger INSTANCE_COUNTER = new AtomicInteger();private static final AtomicBoolean WARNED_TOO_MANY_INSTANCES = new AtomicBoolean();private static final int INSTANCE_COUNT_LIMIT = 64;private static final long MILLISECOND_NANOS = TimeUnit.MILLISECONDS.toNanos(1);private static final ResourceLeakDetector<HashedWheelTimer> leakDetector = ResourceLeakDetectorFactory.instance().newResourceLeakDetector(HashedWheelTimer.class, 1);private static final AtomicIntegerFieldUpdater<HashedWheelTimer> WORKER_STATE_UPDATER =AtomicIntegerFieldUpdater.newUpdater(HashedWheelTimer.class, "workerState");private final ResourceLeakTracker<HashedWheelTimer> leak;//指针转动和延时任务执行的线程private final Worker worker = new Worker();//worker任务封装的工作线程,用于指针转动和触发时间格里的延时任务的执行private final Thread workerThread;public static final int WORKER_STATE_INIT = 0;public static final int WORKER_STATE_STARTED = 1;public static final int WORKER_STATE_SHUTDOWN = 2;@SuppressWarnings({"unused", "FieldMayBeFinal"})private volatile int workerState;//0 - init, 1 - started, 2 - shut down//每个时间格的时间跨度,默认为100msprivate final long tickDuration;//时间轮(环形数组),HashedWheelBucket为每个时间格的槽private final HashedWheelBucket[] wheel;private final int mask;private final CountDownLatch startTimeInitialized = new CountDownLatch(1);//延时任务队列,队列中为等待被添加到时间轮的延时任务private final Queue<HashedWheelTimeout> timeouts = PlatformDependent.newMpscQueue();//保存已经取消的延时任务的队列private final Queue<HashedWheelTimeout> cancelledTimeouts = PlatformDependent.newMpscQueue();//记录当前的任务数private final AtomicLong pendingTimeouts = new AtomicLong(0);//最大的任务数private final long maxPendingTimeouts;//执行延时任务的线程池private final Executor taskExecutor;//工作线程启动时间private volatile long startTime;////////////////////////// 构造器 start //////////////////////////public HashedWheelTimer() {this(Executors.defaultThreadFactory());}public HashedWheelTimer(long tickDuration, TimeUnit unit) {this(Executors.defaultThreadFactory(), tickDuration, unit);}public HashedWheelTimer(long tickDuration, TimeUnit unit, int ticksPerWheel) {this(Executors.defaultThreadFactory(), tickDuration, unit, ticksPerWheel);}//使用默认的tickDuration(时间格跨度默认为100ms)和默认的ticksPerWheel(时间格总数默认为512)创建一个新的计时器(时间轮)public HashedWheelTimer(ThreadFactory threadFactory) {this(threadFactory, 100, TimeUnit.MILLISECONDS);}public HashedWheelTimer(ThreadFactory threadFactory, long tickDuration, TimeUnit unit) {this(threadFactory, tickDuration, unit, 512);}public HashedWheelTimer(ThreadFactory threadFactory, long tickDuration, TimeUnit unit, int ticksPerWheel) {this(threadFactory, tickDuration, unit, ticksPerWheel, true);}public HashedWheelTimer(ThreadFactory threadFactory, long tickDuration, TimeUnit unit, int ticksPerWheel, boolean leakDetection) {this(threadFactory, tickDuration, unit, ticksPerWheel, leakDetection, -1);}public HashedWheelTimer(ThreadFactory threadFactory, long tickDuration, TimeUnit unit, int ticksPerWheel, boolean leakDetection, long maxPendingTimeouts) { this(threadFactory, tickDuration, unit, ticksPerWheel, leakDetection, maxPendingTimeouts, ImmediateExecutor.INSTANCE);}//Creates a new timer.//@param threadFactory        创建线程的工厂//@param tickDuration         每格的时间间隔,默认100ms,0.1秒//@param unit                 时间单位,默认为毫秒//@param ticksPerWheel        时间轮的格子数,默认为512;如果传入的不是2的N次方,则会调整为大于等于该参数的第一个2的N次方,好处是可以优化hash值的计算 //@param leakDetection        如果false,那么只有工作线程不是后台线程时才会追踪资源泄露,这个参数可以忽略//@param maxPendingTimeouts   最大的pending数量(时间轮中任务的最大数量),超过这个值之后调用将抛出异常,0或者负数表示没有限制,默认为-1//@param taskExecutor         任务线程池,用于执行提交的任务,调用者负责在不需要时关闭它//@throws NullPointerException     if either of threadFactory and unit is null//@throws IllegalArgumentException if either of tickDuration and ticksPerWheel is <= 0public HashedWheelTimer(ThreadFactory threadFactory, long tickDuration, TimeUnit unit, int ticksPerWheel, boolean leakDetection, long maxPendingTimeouts, Executor taskExecutor) { //1.构造参数校验及给实际执行延时任务的线程池taskExecutor赋值checkNotNull(threadFactory, "threadFactory");checkNotNull(unit, "unit");checkPositive(tickDuration, "tickDuration");checkPositive(ticksPerWheel, "ticksPerWheel");this.taskExecutor = checkNotNull(taskExecutor, "taskExecutor");//2.将ticksPerWheel(时间轮上的时间格数)向上取值为2的幂,方便进行求商和取余计算//3.初始化时间轮wheelwheel = createWheel(ticksPerWheel);//mask的设计和HashMap一样,通过限制数组的大小为2的幂,利用位运算来替代取模运算,提高性能mask = wheel.length - 1;//4.校验tickDuration和ticksPerWheel//Convert tickDuration to nanos.long duration = unit.toNanos(tickDuration);//防止溢出//tickDuration * ticksPerWheel必须小于Long.MAX_VALUEif (duration >= Long.MAX_VALUE / wheel.length) {throw new IllegalArgumentException(String.format("tickDuration: %d (expected: 0 < tickDuration in nanos < %d", tickDuration, Long.MAX_VALUE / wheel.length));}//tickDuration不能小于1msif (duration < MILLISECOND_NANOS) {logger.warn("Configured tickDuration {} smaller then {}, using 1ms.", tickDuration, MILLISECOND_NANOS);this.tickDuration = MILLISECOND_NANOS;} else {this.tickDuration = duration;}//5.创建工作线程,用于指针转动和触发时间格里的延时任务的执行workerThread = threadFactory.newThread(worker);leak = leakDetection || !workerThread.isDaemon() ? leakDetector.track(this) : null;//6.给时间轮中任务的最大数量maxPendingTimeouts赋值this.maxPendingTimeouts = maxPendingTimeouts;//7.检查HashedWheelTimer的实例数量,如果大于64则打印error日志if (INSTANCE_COUNTER.incrementAndGet() > INSTANCE_COUNT_LIMIT &&WARNED_TOO_MANY_INSTANCES.compareAndSet(false, true)) {reportTooManyInstances();}}////////////////////////// 构造器 end //////////////////////////@Overrideprotected void finalize() throws Throwable {try {super.finalize();} finally {//This object is going to be GCed and it is assumed the ship has sailed to do a proper shutdown. //If we have not yet shutdown then we want to make sure we decrement the active instance count.if (WORKER_STATE_UPDATER.getAndSet(this, WORKER_STATE_SHUTDOWN) != WORKER_STATE_SHUTDOWN) {INSTANCE_COUNTER.decrementAndGet();}}}//初始化时间轮环形数组//@param ticksPerWheelprivate static HashedWheelBucket[] createWheel(int ticksPerWheel) {//ticksPerWheel不能大于2^30checkInRange(ticksPerWheel, 1, 1073741824, "ticksPerWheel");//将ticksPerWheel(轮子上的时间格数)向上取值为2的次幂ticksPerWheel = normalizeTicksPerWheel(ticksPerWheel);//创建时间轮环形数组HashedWheelBucket[] wheel = new HashedWheelBucket[ticksPerWheel];for (int i = 0; i < wheel.length; i ++) {wheel[i] = new HashedWheelBucket();}return wheel;}//将ticksPerWheel(时间轮上的时间格数)向上取值为2的次幂private static int normalizeTicksPerWheel(int ticksPerWheel) {int normalizedTicksPerWheel = 1;while (normalizedTicksPerWheel < ticksPerWheel) {normalizedTicksPerWheel <<= 1;}return normalizedTicksPerWheel;}//显式启动后台线程//即使没有调用此方法,后台线程也会按需自动启动//Starts the background thread explicitly.  //The background thread will start automatically on demand even if you did not call this method.//@throws IllegalStateException if this timer has been #stop() stopped alreadypublic void start() {switch (WORKER_STATE_UPDATER.get(this)) {case WORKER_STATE_INIT:if (WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_INIT, WORKER_STATE_STARTED)) {//启动工作线程,即启动时间轮workerThread.start();}break;case WORKER_STATE_STARTED:break;case WORKER_STATE_SHUTDOWN:throw new IllegalStateException("cannot be started once stopped");default:throw new Error("Invalid WorkerState");}//Wait until the startTime is initialized by the worker.while (startTime == 0) {try {//阻塞时间轮的工作线程startTimeInitialized.await();} catch (InterruptedException ignore) {//Ignore - it will be ready very soon.}}}@Overridepublic Set<Timeout> stop() {if (Thread.currentThread() == workerThread) {throw new IllegalStateException(HashedWheelTimer.class.getSimpleName() + ".stop() cannot be called from " + TimerTask.class.getSimpleName());}if (!WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_STARTED, WORKER_STATE_SHUTDOWN)) {//workerState can be 0 or 2 at this moment - let it always be 2.if (WORKER_STATE_UPDATER.getAndSet(this, WORKER_STATE_SHUTDOWN) != WORKER_STATE_SHUTDOWN) {INSTANCE_COUNTER.decrementAndGet();if (leak != null) {boolean closed = leak.close(this);assert closed;}}return Collections.emptySet();}try {boolean interrupted = false;while (workerThread.isAlive()) {workerThread.interrupt();try {workerThread.join(100);} catch (InterruptedException ignored) {interrupted = true;}}if (interrupted) {Thread.currentThread().interrupt();}} finally {INSTANCE_COUNTER.decrementAndGet();if (leak != null) {boolean closed = leak.close(this);assert closed;}}return worker.unprocessedTimeouts();}//添加延时任务//@param task 任务//@param delay 延时时间//@param unit 延时时间单位@Overridepublic Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {checkNotNull(task, "task");checkNotNull(unit, "unit");//1.将需要执行的延时任务数pendingTimeouts + 1long pendingTimeoutsCount = pendingTimeouts.incrementAndGet();//2.如果pendingTimeouts超过maxPendingTimeouts,则抛出异常if (maxPendingTimeouts > 0 && pendingTimeoutsCount > maxPendingTimeouts) {pendingTimeouts.decrementAndGet();throw new RejectedExecutionException("Number of pending timeouts (" + pendingTimeoutsCount + ") is greater than or equal to maximum allowed pending " + "timeouts (" + maxPendingTimeouts + ")"); }//3.启动工作线程,即启动时间轮start();//将延时任务添加到延时任务队列timeouts中,该队列将在下一个滴答声中处理(指针的下一次转动)//在处理过程中,所有排队的HashedWheelTimeout将被添加到正确的HashedWheelBucket//4.计算任务的截止时间deadline = 当前时间 + 当前任务执行的延迟时间 - 时间轮启动的时间long deadline = System.nanoTime() + unit.toNanos(delay) - startTime;if (delay > 0 && deadline < 0) {deadline = Long.MAX_VALUE;}//5.创建延时任务实例HashedWheelTimeoutHashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline);//6.将延时任务实例添加到延时任务队列中timeouts.add(timeout);return timeout;}//Returns the number of pending timeouts of this Timer.public long pendingTimeouts() {return pendingTimeouts.get();}private static void reportTooManyInstances() {if (logger.isErrorEnabled()) {String resourceType = simpleClassName(HashedWheelTimer.class);logger.error("You are creating too many " + resourceType + " instances. " +resourceType + " is a shared resource that must be reused across the JVM, " +"so that only a few instances are created.");}}//指针转动和延时任务执行的线程private final class Worker implements Runnable {//用于记录未执行的延时任务private final Set<Timeout> unprocessedTimeouts = new HashSet<Timeout>();//总的tick数(指针嘀嗒的次数)private long tick;@Overridepublic void run() {//1.记录时间轮启动的时间startTimestartTime = System.nanoTime();if (startTime == 0) {//我们在这里使用0作为未初始化值的指示符,所以要确保初始化时它不是0startTime = 1;}//2.唤醒被阻塞的start()方法,通知时间轮已经启动完毕startTimeInitialized.countDown();//一直执行do while循环,直到时间轮被关闭do {//3.阻塞等待下一次指针转动的时间//这里会休眠tick的时间,模拟指针走动final long deadline = waitForNextTick();if (deadline > 0) {//4.计算当前指针指向的时间轮槽位idxint idx = (int) (tick & mask);//5.将已经取消的任务从HashedWheelBucket数组中移除,并将pendingTimeouts任务数 - 1processCancelledTasks();//6.获取当前指针指向的时间槽HashedWheelBucketHashedWheelBucket bucket = wheel[idx];//7.遍历延时任务队列timeouts,将其中的延时任务保存到对应的槽的链表中transferTimeoutsToBuckets();//8.运行目前指针指向的槽中的链表的任务,交给taskExecutor线程池去执行到期的延时任务//9.到期的和取消的延时任务从链表中移除并将pendingTimeouts--bucket.expireTimeouts(deadline);//10.时间轮指针的总转动次数tick++tick++;}} while (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_STARTED);//Fill the unprocessedTimeouts so we can return them from stop() method.//11.清除时间轮中不需要处理的任务for (HashedWheelBucket bucket: wheel) {bucket.clearTimeouts(unprocessedTimeouts);}//12.将延时任务队列中还未添加到时间轮的延时任务保存到unprocessedTimeouts中//遍历任务队列,如果发现有任务被取消,则添加到unprocessedTimeouts,也就是不需要处理的队列中for (;;) {HashedWheelTimeout timeout = timeouts.poll();if (timeout == null) {break;}if (!timeout.isCancelled()) {//如果延时任务没被取消,记录到未执行的任务Set集合中unprocessedTimeouts.add(timeout);}}//13.处理被取消的任务processCancelledTasks();}//将延时任务队列timeouts中等待添加到时间轮中的延时任务,转移到时间轮的指定位置//也就是遍历延时任务队列timeouts,将其中的延时任务保存到对应的槽的链表中private void transferTimeoutsToBuckets() {//每次转移10w个延时任务for (int i = 0; i < 100000; i++) {//从队列中出队一个延时任务HashedWheelTimeout timeout = timeouts.poll();if (timeout == null) {//all processedbreak;}if (timeout.state() == HashedWheelTimeout.ST_CANCELLED) {//Was cancelled in the meantime.continue;}//到期一共需要走多少时间格(tick次数),deadline表示当前任务的延迟时间(从时间轮启动时计算),tickDuration表示时间格的时间间隔 long calculated = timeout.deadline / tickDuration;//tick已经走了的时间格,到期一共还需要需要走多少圈timeout.remainingRounds = (calculated - tick) / wheel.length;//如果延时任务在队列中等待太久已经过了执行时间,那么这个时候就使用当前tick,也就是放在当前的bucket,此方法调用完后就会被执行final long ticks = Math.max(calculated, tick);//槽的索引,stopIndex = tick 次数 & mask, mask = wheel.length - 1int stopIndex = (int) (ticks & mask);//根据索引该任务应该放到的槽HashedWheelBucket bucket = wheel[stopIndex];//将任务添加到槽中,链表末尾bucket.addTimeout(timeout);}}//处理取消掉的延时任务//将已经取消的任务从HashedWheelBucket数组中移除,并将pendingTimeouts任务数 - 1private void processCancelledTasks() {for (;;) {HashedWheelTimeout timeout = cancelledTimeouts.poll();if (timeout == null) {//all processedbreak;}try {timeout.remove();} catch (Throwable t) {if (logger.isWarnEnabled()) {logger.warn("An exception was thrown while process a cancellation task", t);}}}}//从时间轮的启动时间startTime和当前的tick数(指针跳动次数)计算下一次指针跳动的时间,然后休眠等待下一次指针跳动时间到来private long waitForNextTick() {//deadline返回的是下一次时间轮指针跳动的时间与时间格启动的时间间隔long deadline = tickDuration * (tick + 1);for (;;) {//计算当前时间距离启动时间的时间间隔final long currentTime = System.nanoTime() - startTime;//距离下一次指针跳动还需休眠多长时间long sleepTimeMs = (deadline - currentTime + 999999) / 1000000;//到了指针调到下一个槽位的时间if (sleepTimeMs <= 0) {if (currentTime == Long.MIN_VALUE) {return -Long.MAX_VALUE;} else {return currentTime;}}try {//表示距离下一次指针跳动还需要一段时间,所以休眠等待时间的到来Thread.sleep(sleepTimeMs);} catch (InterruptedException ignored) {if (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_SHUTDOWN) {return Long.MIN_VALUE;}}}}//记录未执行的延时任务public Set<Timeout> unprocessedTimeouts() {return Collections.unmodifiableSet(unprocessedTimeouts);}}//延时任务private static final class HashedWheelTimeout implements Timeout, Runnable {private static final int ST_INIT = 0;private static final int ST_CANCELLED = 1;private static final int ST_EXPIRED = 2;private static final AtomicIntegerFieldUpdater<HashedWheelTimeout> STATE_UPDATER =AtomicIntegerFieldUpdater.newUpdater(HashedWheelTimeout.class, "state");private final HashedWheelTimer timer;private final TimerTask task;//任务执行的截止时间 = 当前时间 + 延时任务延时时间 - 时间轮启动时间private final long deadline;@SuppressWarnings({"unused", "FieldMayBeFinal", "RedundantFieldInitialization" })private volatile int state = ST_INIT;//剩下的圈(轮)数//remainingRounds将由Worker.transferTimeoutsToBuckets()在HashedWheelTimeout被添加到正确的HashedWheelBucket之前计算和设置long remainingRounds;//HashedWheelTimerBucket槽中的延时任务列表是一个双向链表//因为只有workerThread会对它进行操作,所以不需要 synchronization / volatileHashedWheelTimeout next;HashedWheelTimeout prev;//当前延时任务所插入时间轮的哪个槽HashedWheelBucket bucket;HashedWheelTimeout(HashedWheelTimer timer, TimerTask task, long deadline) {this.timer = timer;this.task = task;this.deadline = deadline;}@Overridepublic Timer timer() {return timer;}@Overridepublic TimerTask task() {return task;}@Overridepublic boolean cancel() {//only update the state it will be removed from HashedWheelBucket on next tick.if (!compareAndSetState(ST_INIT, ST_CANCELLED)) {return false;}//If a task should be canceled we put this to another queue which will be processed on each tick.//So this means that we will have a GC latency of max. 1 tick duration which is good enough. This way//we can make again use of our MpscLinkedQueue and so minimize the locking / overhead as much as possible.timer.cancelledTimeouts.add(this);return true;}void remove() {HashedWheelBucket bucket = this.bucket;if (bucket != null) {bucket.remove(this);} else {timer.pendingTimeouts.decrementAndGet();}}public boolean compareAndSetState(int expected, int state) {return STATE_UPDATER.compareAndSet(this, expected, state);}public int state() {return state;}@Overridepublic boolean isCancelled() {return state() == ST_CANCELLED;}@Overridepublic boolean isExpired() {return state() == ST_EXPIRED;}public void expire() {if (!compareAndSetState(ST_INIT, ST_EXPIRED)) {return;}try {timer.taskExecutor.execute(this);} catch (Throwable t) {if (logger.isWarnEnabled()) {logger.warn("An exception was thrown while submit " + TimerTask.class.getSimpleName() + " for execution.", t);}}}@Overridepublic void run() {try {task.run(this);} catch (Throwable t) {if (logger.isWarnEnabled()) {logger.warn("An exception was thrown by " + TimerTask.class.getSimpleName() + '.', t);}}}@Overridepublic String toString() {final long currentTime = System.nanoTime();long remaining = deadline - currentTime + timer.startTime;StringBuilder buf = new StringBuilder(192).append(simpleClassName(this)).append('(').append("deadline: ");if (remaining > 0) {buf.append(remaining).append(" ns later");} else if (remaining < 0) {buf.append(-remaining).append(" ns ago");} else {buf.append("now");}if (isCancelled()) {buf.append(", cancelled");}return buf.append(", task: ").append(task()).append(')').toString();}}//存放HashedWheelTimeouts的桶//这些数据存储在一个类似于链表的数据结构中,允许轻松删除中间的hashedwheeltimeout//HashedWheelTimeout本身作为节点,因此不需要创建额外的对象//保存头结点和尾节点,方便于任务的提取和插入private static final class HashedWheelBucket {//头结点private HashedWheelTimeout head;//尾节点private HashedWheelTimeout tail;//Add HashedWheelTimeout to this bucket.public void addTimeout(HashedWheelTimeout timeout) {assert timeout.bucket == null;timeout.bucket = this;if (head == null) {head = tail = timeout;} else {tail.next = timeout;timeout.prev = tail;tail = timeout;}}//Expire all HashedWheelTimeouts for the given deadline.public void expireTimeouts(long deadline) {HashedWheelTimeout timeout = head;//遍历当前时间槽中的所有任务while (timeout != null) {HashedWheelTimeout next = timeout.next;if (timeout.remainingRounds <= 0) {//从链表中移除next = remove(timeout);if (timeout.deadline <= deadline) {//延时任务到期,执行延时任务timeout.expire();} else {//The timeout was placed into a wrong slot. This should never happen.throw new IllegalStateException(String.format("timeout.deadline (%d) > deadline (%d)", timeout.deadline, deadline));}//如果延时任务取消,从链表中移除} else if (timeout.isCancelled()) {next = remove(timeout);} else {//任务还没到期,剩余的轮数-1timeout.remainingRounds --;}//将指针放置到下一个延时任务上timeout = next;}}//删除槽中链表中的延时任务public HashedWheelTimeout remove(HashedWheelTimeout timeout) {HashedWheelTimeout next = timeout.next;//remove timeout that was either processed or cancelled by updating the linked-listif (timeout.prev != null) {timeout.prev.next = next;}if (timeout.next != null) {timeout.next.prev = timeout.prev;}if (timeout == head) {//if timeout is also the tail we need to adjust the entry tooif (timeout == tail) {tail = null;head = null;} else {head = next;}} else if (timeout == tail) {//if the timeout is the tail modify the tail to be the prev node.tail = timeout.prev;}//null out prev, next and bucket to allow for GC.timeout.prev = null;timeout.next = null;timeout.bucket = null;timeout.timer.pendingTimeouts.decrementAndGet();return next;}//Clear this bucket and return all not expired / cancelled Timeouts.public void clearTimeouts(Set<Timeout> set) {for (;;) {HashedWheelTimeout timeout = pollTimeout();if (timeout == null) {return;}if (timeout.isExpired() || timeout.isCancelled()) {continue;}set.add(timeout);}}//头结点移除private HashedWheelTimeout pollTimeout() {HashedWheelTimeout head = this.head;if (head == null) {return null;}HashedWheelTimeout next = head.next;if (next == null) {tail = this.head =  null;} else {this.head = next;next.prev = null;}//null out prev and next to allow for GC.head.next = null;head.prev = null;head.bucket = null;return head;}}
}

 

9.HashedWheelTimer的总结

一.时间轮的转动是单线程

但是时间轮中每个时间槽里的延时任务则是由线程池来执行的。

 

二.延时任务保存到JVM中没有做宕机备份

系统重启时延时任务将会丢失,无法恢复任务进行重新调度。

 

三.时间轮调度器的时间精度不是很高

对于精度要求特别高的调度任务可能不太适合,因为时间轮的精度取决于时间格的跨度大小。

 

四.时间轮指针的转动是使用Sleep来完成等待的

 

10.HashedWheelTimer的应用

(1)时间轮的应用场景

一.Dubbo、Netty、Kafka、Redission等中间件都用到了时间轮机制

二.订单关闭、确认收货、批量定时数据更新等都可以采用时间轮机制

 

(2)心跳检测

心跳机制会每隔固定的时间发送一个心跳包来检测客户端和服务端的连接状态,客户端发送心跳包用来告诉服务器其还正常运行。

 

比如在Dubbo中,需要有心跳机制来维持Consumer与Provider的长连接,默认的心跳间隔是60s。当Provider在3次心跳时间内没有收到心跳响应,会关闭连接通道。当Consumer在3次心跳时间内没有收到心跳响应,会进行重连。

 

在Dubbo的HeaderExchangeClient类中会向时间轮中提交该心跳任务:

一.发送心跳的时间轮

private static final HashedWheelTimer IDLE_CHECK_TIMER =new HashedWheelTimer(new NamedThreadFactory("dubbo-client-idleCheck", true), 1, TimeUnit.SECONDS, TICKS_PER_WHEEL);

二.向时间轮中提交心跳任务

private void startHeartBeatTask(URL url) {//Client的具体实现决定是否启动该心跳任务if (!client.canHandleIdle()) {AbstractTimerTask.ChannelProvider cp = () -> Collections.singletonList(HeaderExchangeClient.this);//计算心跳间隔, 最小间隔不能低于1sint heartbeat = getHeartbeat(url);long heartbeatTick = calculateLeastDuration(heartbeat);//创建心跳任务this.heartBeatTimerTask = new HeartbeatTimerTask(cp, heartbeatTick, heartbeat);//提交到IDLE_CHECK_TIMER这个时间轮中等待执行, 等时间到了时间轮就会去取出该任务进行调度执行IDLE_CHECK_TIMER.newTimeout(heartBeatTimerTask, heartbeatTick, TimeUnit.MILLISECONDS);}
}

(3)超时处理

在Dubbo中发起RPC调用时,通常会配置超时时间,当消费者调用服务提供者出现超时进行一定的逻辑处理。那么怎么检测任务调用超时了呢?我们可以利用定时任务。

 

每次发起RPC调用时创建一个Future,记录这个Future的创建时间与超时时间,后台有一个定时任务进行检测。当Future到达超时时间并且没有被处理时,就需要对这个Future执行超时逻辑处理。

 

(4)Redisson分布式锁续期

Redisson看门狗机制,通过时间轮定时给分布式锁续期。在获取锁成功后,Redisson会封装一个锁续期的延时任务放入到时间轮中。默认10秒检查一下,用于对获取到的锁进行续期,延长持有锁的时间。如果业务机器宕机了,那么续期的延时任务失效,也无法续期,锁会超时释放。

 

一.添加续期延时任务

private void renewExpiration() {ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());if (ee == null) {return;}//这边newTimeout点进去发现就是往时间轮中提交了一个任务Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {@Overridepublic void run(Timeout timeout) throws Exception {ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());if (ent == null) {return;}Long threadId = ent.getFirstThreadId();if (threadId == null) {return;}RFuture<Boolean> future = renewExpirationAsync(threadId);future.onComplete((res, e) -> {if (e != null) {log.error("Can't update lock " + getName() + " expiration", e);return;}if (res) {//续期成功后继续调度, 又往时间轮中放一个续期任务renewExpiration();}});}}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);ee.setTimeout(task);
}

二.lua续期代码

protected RFuture<Boolean> renewExpirationAsync(long threadId) {//通过lua脚本对锁进行续期return evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +"redis.call('pexpire', KEYS[1], ARGV[1]); " +"return 1; " +"end; " +"return 0;",Collections.singletonList(getName()),internalLockLeaseTime, getLockName(threadId));
}

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/908364.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

0330-好的开始是成功的一半

前言 今天帮一个 USC Game Dev 专业同学做项目,真的挺复杂的一个项目。 但是我依然把项目配置好了,后面就是慢慢的添加新功能。 我用了 git 管理这个项目,把自己的每一步关键操作都用 git commit 记录一下。效果心路历程 我想过很多次 “要不就放弃吧” 但是看到旁边的“Att…

数仓项目建设方案——维度建模

数仓项目建设方案——维度建模式信息收集项目背景 阐述公司当前的行业,涉及的主要业务,相关数据的大小、分布、更新情况描述,需要解决的相关问题。公司当前数据建设现状 使用的数据库、数据来源系统与方式、现有数据分析组织,所使用的 BI 工具与数仓工具、为什么建立以及当…

在机器人和无人机时代,测绘人的出路在哪里?

一、技术革命:当测绘行业按下“加速键”无人机与机器人技术正在重塑测绘行业的底层逻辑。传统测绘依赖人工作业,效率低、成本高且风险大,而无人机凭借其灵活性和高效性,已能快速完成大范围地形测绘,精度可达厘米级,甚至替代人工进入危险区域(如塌方、悬崖等)作业。例如…

openwrt禁止设备联网

一、代码操作 把mac地址换成要禁用的设备mac地址,加到自定义防火墙最后,记得最后重启防火墙生效 /etc/init.d/firewall restart iptables -I INPUT -m mac --mac-source B8:C7:4A:7A:66:2E -j DROP iptables -I FORWARD -m mac --mac-source B8:C7:4A:7A:66:2E -j DROP iptab…

JVM调优原理篇

JVM调优 什么是JVM调优,调优的指标是什么? JVM调优指的就是对当前系统进行性能调优,简单来说就是尽可能使用较小的内存和CPU来让JAVA程序获得更高的吞吐量及较低的延迟。 调优常见的指标:吞吐量:是指不考虑垃圾收集引起的停顿时间或内存消耗,应用达到的最高性能指标。 延…

20241216 实验二《Python程序设计》实验报告

20241216 2024-2025-2 《Python程序设计》实验二报告 课程:《Python程序设计》 班级: 2412 姓名: 曾楷 学号:20241216 实验教师:王志强 实验日期:2024年3月26日 必修/选修: 公选课 (一) 实验内容 1.设计并完成一个完整的应用程序,完成加减乘除模等运算,功能多多益善…

[Windows] TechSweeper 应用程序卸载神器V1.2.1

一.我们改进了程序元素显示,现在超出列宽,可以进行滚动显示二.我们为程序添加了右键菜单,现在功能更加全面三.现在程序出现崩溃时,可以进行错误提示与收集四.现在程序可以进行主题切换了五.添加了作者相关信息六.现在可以打开程序相关注册表了(直接显示 清晰明了)七.现在可…

鸢尾花书 - Book_3《数学要素》 - Chapter1 万物皆数

上面图片摘自原书 一、基础概念普及 1. 向量 若干数字排成一行或一列,并且用中括号括起来,得到的数组叫做向量。 2. 行向量 排成一行 3. 列向量 排成一列 4. 转置 行向量转置得到列向量,反之。 5. 矩阵 有行,有列,像表格。 6. 元素 x[i][j] 代表矩阵 X 中第 i 行第 j 列元…

VMware workstation 17 pro 设置开机自启虚拟机(Windows 11)

首先在软件界面设置需要启动的虚拟机 文件➡配置自动启动的虚拟机在打开的对话框中勾选需要自动启动的虚拟机和设置启动顺序点击确定即可。如果点击确定报错的话,如下图看起来问题可能出在两个方面:要么是vmAutoStart.xml文件不存在,要么是当前用户没有足够的权限去修改这个…

RabbitMQ进阶--集群,分布式事务

一.RabbitMQ集群搭建 RabbitMQ这款消息队列中间件产品本身是基于Erlang编写,Erlang语言天生具备分布式特性(通过同步Erlang集群各节点的magic cookie来实现)。因此,RabbitMQ天然支持Clustering。这使得RabbitMQ本身不需要像ActiveMQ、Kafka那样通过ZooKeeper分别来实现HA方…

企业为何要使用odoo18

在当今快速变化的商业环境中,企业需要高效、灵活且经济实惠的管理工具来保持竞争力。Odoo 18 作为一款开源的企业资源计划(ERP)系统,凭借其全面的功能和独特的优势,成为众多企业的首选。为什么选择 Odoo 18? 1. 全面的功能覆盖 Odoo 18 集成了销售、采购、库存、制造、财…

deepclaude 的使用 直捣黄龙

先说结论,最简单的就是购买deepseek 和 claude 的api之后, 直接使用vscode中的cline插件,其中 cline的plan(计划模式)配置deepseek的api,act(执行模型)配置claude的api 直接上连接: deepseek开放平台 https://platform.deepseek.com/ claude 开放平台 https://con…