业务场景
业务开发过程中,我们经常会需要判断远程终端是否在线,当终端离线的时候我们需要发送消息告知相应的系统,
环形队列
1.创建一个index从0到30的环形队列(本质是个数组)
2.环上每一个slot是一个Set,任务集合
3.同时还有一个Map<uid, index>,记录uid落在环上的哪个slot里
4.启动一个timer,每隔1s,在上述环形队列中移动一格,0->1->2->3…->29->30->0…
5.有一个Current Index指针来标识刚检测过的slot
6.接收到设备心跳后将寻找到原来uid的位置然后移动到当前指针的后一位,并删除原来slot里的uid
7.这样就可以快速获取超时的设备uid
环形队列实现
package com.zngx.admin.circle;import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;/*** @Author : zhiying* @Date : 2022-11-22 15:17* @Desc : 环形队列 - 设备离线判定* 1、预设一个长度为10000的数组(按实际业务定义长度)* 2、每个数组存放一个Set集合* 3、维护一个游标cur,从0到9999递增,到达9999时,重置为0(启动一个线程执行)* 4、维护一个map,记录所有设备ID存放的数组位置,方便查找* 5、监听到设备心跳时,先将原来的数据从指定位置的集合中删除,通过计算当前游标位置和keepAlive寻找合适的位置将设备ID放入* 6、当游标指向某个位置a时,a位置的集合中的所有设备全部判定为离线,并清空该位置的集合**/public class CircleQueue<T> {//线程安全锁Lock lock = new ReentrantLock();//初始环形队列大小private int capacity = 10000;//当前环形队列所在节点private volatile int currentIndex = 0;//数据所在节点private Map<T,Integer> dataIndex = new HashMap<>();//环形队列private Set<T>[] array;public CircleQueue(){array = new HashSet[capacity];}public CircleQueue(int capacity){this.capacity = capacity;array = new HashSet[capacity];}/*** 向环形队列中添加元素* @param t* @param offset 偏移量,基于游标*/public void add(T t, int offset){int index = currentIndex + offset;if(index >= capacity){index = index - capacity;}try {lock.lock();//判断数据是否存在if(dataIndex.containsKey(t)){Set<T> old = array[dataIndex.get(t)];old.remove(t);}//获取当前节点的队列Set<T> set = array[index];if(null == set){set = new HashSet<>();array[index] = set;}set.add(t);//更新新的节点位置dataIndex.put(t,index);}catch (Exception e){e.printStackTrace();}finally {lock.unlock();}}/*** 下移一格,到9999重新置为0*/public void next(){int cur = currentIndex + 1;if(cur >= capacity){cur = cur - capacity;}currentIndex = cur;System.out.println("当前游标位置:" + currentIndex);}/*** 获取当前游标指向的元素集合* @return*/public Set<T> getAndDeleteData(){Set<T> set = null;try {lock.lock();set = array[currentIndex];return set;}finally {// 将集合中所有的元素移除array[currentIndex] = new HashSet<>();if(set != null && set.size()>0){set.forEach(t -> {dataIndex.remove(t);});}lock.unlock();}}public int getIndex(T t){if(dataIndex.containsKey(t)){return dataIndex.get(t);}return -1;}
}
测试代码
@Testpublic void circleTest(){CircleQueue<String> circleQueue = new CircleQueue<>();for (int i=0;i<1000;i++){String uuid = String.valueOf(i+1);int offset = (int) Math.round(Math.random()*10);circleQueue.add(uuid, offset);}checkTimeout(circleQueue);insertDataRandom(circleQueue);try {Thread.sleep(600000);}catch (Exception e){e.printStackTrace();}}private void checkTimeout(CircleQueue<String> circleQueue){service.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {Set<String> set = circleQueue.getAndDeleteData();if(set == null || set.isEmpty()) {System.out.println("本次没有设备离线");}else{System.out.println("这些设备离线啦:" + Joiner.on(",").join(set));}circleQueue.next();}},2,1, TimeUnit.SECONDS);}private void insertDataRandom(CircleQueue<String> circleQueue){service.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {String deviceId = String.valueOf(Math.round(Math.random()*100000));int offset = (int) Math.round(Math.random()*10);circleQueue.add(deviceId, offset);System.out.println("插入设备["+deviceId+"], " + offset + "秒后离线");}},3,3, TimeUnit.SECONDS);}
测试结果
148aed53-a083-40a6-82d3-ede14e5e39c9初始时间1585571543739
当前位置:0数据大小:0
当前位置:1数据大小:0
当前位置:2数据大小:0
当前位置:3数据大小:0
当前位置:4数据大小:0
148aed53-a083-40a6-82d3-ede14e5e39c9不移动位置29
当前位置:5数据大小:0
当前位置:6数据大小:0
当前位置:7数据大小:0
当前位置:8数据大小:0
当前位置:9数据大小:0
当前位置:10数据大小:0
当前位置:11数据大小:0
当前位置:12数据大小:0
当前位置:13数据大小:0
当前位置:14数据大小:0
148aed53-a083-40a6-82d3-ede14e5e39c9不移动位置29
当前位置:15数据大小:0
当前位置:16数据大小:0
当前位置:17数据大小:0
当前位置:18数据大小:0
当前位置:19数据大小:0
当前位置:20数据大小:0
当前位置:21数据大小:0
当前位置:22数据大小:0
当前位置:23数据大小:0
当前位置:24数据大小:0
148aed53-a083-40a6-82d3-ede14e5e39c9不移动位置29
当前位置:25数据大小:0
当前位置:26数据大小:0
当前位置:27数据大小:0
当前位置:28数据大小:0
148aed53-a083-40a6-82d3-ede14e5e39c9过期
超时时间30005