1.设计
1.1 背景
系统启动后,所有任务都在被执行,如果这时某个节点宕机,那它负责的任务就不能执行了,这对有稳定性要求的任务是不能接受的,所以系统要实现rebalance的功能。
1.2 设计
下面是Job分配与执行的业务点,重分配就是在 follower下线、controller下线、节点新上线进行重分配。理清楚接下来实现就是水到渠成了
2. 实现
2.1 RebalanceJobType
定义了重平衡job的类型
public enum RebalanceJobType {FOLLOWER_OFFLINE(0), CONTROLLER_OFFLINE(1), NODE_ONLINE(2);private int code;RebalanceJobType(int code) {this.code = code;}public boolean isFollowerOffline() {return this.code == FOLLOWER_OFFLINE.code;}public boolean isControllerOffline() {return this.code == CONTROLLER_OFFLINE.code;}public boolean isNodeOnline() {return this.code == NODE_ONLINE.code;}}
2.2 AverageJobAllotStrategy
添加了 rebalanceJob的方法,只有Controller才能调用,对不同的重平衡情况进行分别处理
private Map<Long, List<DttaskJob>> getDttaskJobMap() {List<DttaskJob> allDttaskJob = getAllDttaskJob();return average(allDttaskJob);
}@Override
public void rebalanceJob(RebalanceJobContext rebalanceJobContext) {if (rebalanceJobContext.getType().isFollowerOffline()|| rebalanceJobContext.getType().isControllerOffline()) {long offlineServerId = rebalanceJobContext.getServerId();log.info("{}节点={}下线->重平衡job={}",rebalanceJobContext.getType().isFollowerOffline() ? "follower" : "controller",offlineServerId,rebalanceJobContext);List<DttaskJob> dttaskJobs = getByDttaskId(offlineServerId);List<NodeInfo> nodeInfoList = ServerInfo.getNodeInfoList();Map<Long, List<DttaskJob>> allotMap = new HashMap<>();int i = 0;int nodeCount = nodeInfoList.size();while (i < dttaskJobs.size()) {DttaskJob dttaskJob = dttaskJobs.get(i);NodeInfo nodeInfo = nodeInfoList.get(i % nodeCount);i++;List<DttaskJob> dttaskJobList = allotMap.getOrDefault(nodeInfo.getServerId(), new ArrayList<>());dttaskJobList.add(dttaskJob);allotMap.put(nodeInfo.getServerId(), dttaskJobList);}executeDttaskJob(new ExecuteDttaskJobContext(allotMap, true));} else if (rebalanceJobContext.getType().isNodeOnline()) {log.info("节点上线->重平衡job={}", rebalanceJobContext);long onlineServerId = rebalanceJobContext.getServerId();Map<Long, List<DttaskJob>> dttaskJobMap = BeanUseHelper.entityHelpService().queryDttaskJob();Map<Long, List<DttaskJob>> allotDttaskJobMap = getDttaskJobMap();Map<Long, List<DttaskJob>> stopDttaskJobMapOfOldNodes = new HashMap<>();Map<Long, List<DttaskJob>> startDttaskJobMapOfNewNodes = new HashMap<>();List<DttaskJob> startDttaskJobs = new ArrayList<>();dttaskJobMap.forEach((serverId, dttaskJobList) -> {int size = dttaskJobList.size();int newSize = allotDttaskJobMap.get(serverId).size();if (size > newSize) {List<DttaskJob> dttaskJobs = dttaskJobList.subList(0, size - newSize);stopDttaskJobMapOfOldNodes.put(serverId, dttaskJobs);startDttaskJobs.addAll(dttaskJobs);}});startDttaskJobMapOfNewNodes.put(onlineServerId, startDttaskJobs);executeDttaskJob(new ExecuteDttaskJobContext(stopDttaskJobMapOfOldNodes, false));executeDttaskJob(new ExecuteDttaskJobContext(startDttaskJobMapOfNewNodes, true));}
}
2.3 ServerClientChannelHandler
对节点下线进行重平衡处理
2.4 NodeOnlineMessageService
3. 测试
启动三个节点,节点完成选举,每个节点执行2个任务
- 3号节点下线
1 2 节点各分配了一个任务继续执行
- 3号节点上线
新上线的3号节点,重新得到2个任务,1 2节点各停止一个任务
至此,节点上下线的任务重平衡完成