前言
Sentinel有pull(拉)模式,和push(推)模式。本文是使用reids实现pull模式。
通过SPI机制引入自己的类
在项目的 resources > META-INF > services下创建新文件,文件名如下,内容是自己实现类的全限定名:com.xx.sentinel.RedisDataSourceInit
创建InitFunc实现类
package com.xx.sentinel;import com.alibaba.csp.sentinel.datasource.Converter;
import com.alibaba.csp.sentinel.datasource.ReadableDataSource;
import com.alibaba.csp.sentinel.datasource.WritableDataSource;
import com.alibaba.csp.sentinel.init.InitFunc;
import com.alibaba.csp.sentinel.slots.block.degrade.DegradeRule;
import com.alibaba.csp.sentinel.slots.block.degrade.DegradeRuleManager;
import com.alibaba.csp.sentinel.slots.block.flow.FlowRule;
import com.alibaba.csp.sentinel.slots.block.flow.FlowRuleManager;
import com.alibaba.csp.sentinel.transport.util.WritableDataSourceRegistry;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.TypeReference;
import com.xx.schedule.Utils.SpringUtil;
import org.apache.commons.lang3.StringUtils;
import org.springframework.data.redis.core.RedisTemplate;import java.util.List;/*** @Author: codeAgent <br>* @CreateDate: 2023/07/01 11:14 <br>* @Description: sentinel初始化方法,进行持久化设置*/
public class RedisDataSourceInit implements InitFunc {/*** sentinel存放redis的key。例如:sentinel:common:flow*/private static final String SENTINEL_REDIS_KEY = "sentinel:%s:%s";/*** sentinel数据更新发布订阅频道。例如:chanel_sentinel_common_flow*/private static final String SENTINEL_REDIS_CHANEL = "chanel_sentinel_%s_%s";private RedisTemplate redisTemplate;private static final String RULE_FLOW = "flow";private static final String RULE_DEGRADE = "degrade";/*** 给模板对象RedisTemplate赋值,并传出去*/private RedisTemplate<String, Object> getRedisTemplate() {if (redisTemplate == null) {synchronized (this) {if (redisTemplate == null) {redisTemplate = SpringUtil.getBean("functionDomainRedisTemplate");}}}return redisTemplate;}/*** 获取sentinel存放redis的key** @param ruleType* @return*/private String getSentinelRedisKey(String ruleType) {String projectName = SentinelRedisHelper.getProjectName();return String.format(SENTINEL_REDIS_KEY, projectName, ruleType);}/*** 获取sentinel数据更新发布订阅频道** @param ruleType* @return*/private String getSentinelRedisChanel(String ruleType) {String projectName = SentinelRedisHelper.getProjectName();return String.format(SENTINEL_REDIS_CHANEL, projectName, ruleType);}@Overridepublic void init() throws Exception {// 没有配置redis或没有配置projectName则不进行持久化配置if (getRedisTemplate() == null || StringUtils.isEmpty(SentinelRedisHelper.getProjectName())) {return;}// 1.处理流控规则this.dealFlowRules();// 2.处理熔断规则this.dealDegradeRules();}/*** 处理流控规则*/private void dealFlowRules() {String redisFlowKey = getSentinelRedisKey(RULE_FLOW);String redisFlowChanel = getSentinelRedisChanel(RULE_FLOW);// 注册flow读取规则// 官方RedisDataSource是订阅获取,官方FileRefreshableDataSource是定时刷新获取。本方法是redis订阅+定时Converter<String, List<FlowRule>> parser = source -> JSON.parseObject(source, new TypeReference<List<FlowRule>>() {});ReadableDataSource<String, List<FlowRule>> redisFlowDataSource = new RedisDataSource2<>(parser, getRedisTemplate(), redisFlowKey, redisFlowChanel);FlowRuleManager.register2Property(redisFlowDataSource.getProperty());// 初始化加载一次所有flow规则String flowRulesStr = (String) getRedisTemplate().opsForValue().get(redisFlowKey);List<FlowRule> flowRuleList = parser.convert(flowRulesStr);redisFlowDataSource.getProperty().updateValue(flowRuleList);// 注册flow写入规则。这样收到控制台推送的规则时,Sentinel 会先更新到内存,然后将规则写入到文件中.WritableDataSource<List<FlowRule>> flowRuleWds = new RedisWritableDataSource<>(JSON::toJSONString, getRedisTemplate(), redisFlowKey, redisFlowChanel);WritableDataSourceRegistry.registerFlowDataSource(flowRuleWds);}/*** 处理熔断规则*/public void dealDegradeRules() {String redisDegradeKey = getSentinelRedisKey(RULE_DEGRADE);String redisDegradeChanel = getSentinelRedisChanel(RULE_DEGRADE);Converter<String, List<DegradeRule>> parser = source -> JSON.parseObject(source, new TypeReference<List<DegradeRule>>() {});ReadableDataSource<String, List<DegradeRule>> redisDegradeDataSource = new RedisDataSource2<>(parser, getRedisTemplate(), redisDegradeKey, redisDegradeChanel);DegradeRuleManager.register2Property(redisDegradeDataSource.getProperty());// 初始化加载一次所有flow规则String degradeRulesStr = (String) getRedisTemplate().opsForValue().get(redisDegradeKey);List<DegradeRule> degradeRuleList = parser.convert(degradeRulesStr);redisDegradeDataSource.getProperty().updateValue(degradeRuleList);// 注册degrade写入规则。这样收到控制台推送的规则时,Sentinel 会先更新到内存,然后将规则写入到文件中.WritableDataSource<List<DegradeRule>> degradeRuleWds = new RedisWritableDataSource<>(JSON::toJSONString, getRedisTemplate(), redisDegradeKey, redisDegradeChanel);WritableDataSourceRegistry.registerDegradeDataSource(degradeRuleWds);}
}
创建读取redis类
继承AutoRefreshDataSource类,可以实现定时读取redis数据,防止redis发布订阅没收到时,更新不到新数据,相当于兜底方案。
package com.XX.sentinel;import com.alibaba.csp.sentinel.datasource.AutoRefreshDataSource;
import com.alibaba.csp.sentinel.datasource.Converter;
import com.alibaba.csp.sentinel.util.AssertUtil;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.redis.core.RedisCallback;
import org.springframework.data.redis.core.RedisTemplate;import java.nio.charset.StandardCharsets;/*** @Author: codeAgent <br>* @CreateDate: 2023/07/04 11:09 <br>* @Description: 订阅redis通知,当sentinel规则发生变化时,拉取redis配置保存到内存。定时获取redis信息*/
public class RedisDataSource2<T> extends AutoRefreshDataSource<String, T> {private static Logger logger = LoggerFactory.getLogger(RedisDataSource2.class);private static final String REDIS_SUCCESS_MSG = "OK";private String lastModified = "-1";private RedisTemplate redisTemplate;/*** 存入redis的对应规则的key*/private String ruleKey;/*** redis订阅频道*/private String channel;/*** 存入redis更新时间的key*/private String ruleUpdateKey;/*** 定时获取redis信息*/private static final long DEFAULT_REFRESH_MS = 300000L;public RedisDataSource2(Converter<String, T> parser, RedisTemplate redisTemplate, String ruleKey, String channel) {// 父级构造器,传入定时执行的时间super(parser, DEFAULT_REFRESH_MS);AssertUtil.notNull(redisTemplate, "redisTemplate can not be null");AssertUtil.notEmpty(ruleKey, "redis ruleKey can not be empty");AssertUtil.notEmpty(channel, "redis subscribe channel can not be empty");this.redisTemplate = redisTemplate;this.ruleKey = ruleKey;this.channel = channel;this.ruleUpdateKey = SentinelRedisHelper.getRedisUpdateTimeKey(ruleKey);subscribeFromChannel();}@Overridepublic String readSource() throws Exception {return (String) redisTemplate.opsForValue().get(ruleKey);}@Overridepublic void close() throws Exception {super.close();redisTemplate.execute((RedisCallback<String>) connection -> {connection.getSubscription().unsubscribe(channel.getBytes(StandardCharsets.UTF_8));return REDIS_SUCCESS_MSG;});}/*** 订阅消息队列*/private void subscribeFromChannel() {redisTemplate.execute((RedisCallback<String>) connection -> {connection.subscribe((message, pattern) -> {byte[] bytes = message.getBody();String msg = new String(bytes, StandardCharsets.UTF_8);logger.info("{},接收到sentinel规则更新消息: {} ", channel, msg);try {// 收到更新通知后,从redis获取全量数据更新到内存中getProperty().updateValue(parser.convert(readSource()));} catch (Exception e) {logger.error(channel + ",接收到sentinel规则更新消息:{},更新出错:{}", msg, e.getMessage());}}, channel.getBytes(StandardCharsets.UTF_8));return REDIS_SUCCESS_MSG;});}@Overrideprotected boolean isModified() {// 根据redis的key查询是否有更新,没有更新返回false,就不用执行后面的拉取数据,提高性能String updateTimeStr = (String) redisTemplate.opsForValue().get(ruleUpdateKey);if (StringUtils.isEmpty(updateTimeStr) || updateTimeStr.equals(lastModified)) {return false;}this.lastModified = updateTimeStr;return true;}}
创建写入redis实现类
package com.XX.sentinel;import com.alibaba.csp.sentinel.datasource.Converter;
import com.alibaba.csp.sentinel.datasource.WritableDataSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.redis.core.RedisTemplate;import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;/*** @Author: codeAgent <br>* @CreateDate: 2023/07/04 09:47 <br>* @Description: 收到sentinel控制的规则更新后,讲规则持久化,并发布redis订阅通知*/
public class RedisWritableDataSource<T> implements WritableDataSource<T> {private static Logger logger = LoggerFactory.getLogger(RedisWritableDataSource.class);private final String redisRuleKey;private final Converter<T, String> configEncoder;private final RedisTemplate redisTemplate;private final String redisFlowChanel;private final Lock lock;/*** 存入redis更新时间的key*/private String ruleUpdateKey;private static final String SENTINEL_RULE_CHANGE = "CHANGE";public RedisWritableDataSource(Converter<T, String> configEncoder, RedisTemplate redisTemplate, String redisRuleKey, String redisFlowChanel) {this.redisRuleKey = redisRuleKey;this.configEncoder = configEncoder;this.redisTemplate = redisTemplate;this.redisFlowChanel = redisFlowChanel;this.lock = new ReentrantLock(true);this.ruleUpdateKey = SentinelRedisHelper.getRedisUpdateTimeKey(redisRuleKey);}@Overridepublic void write(T value) throws Exception {this.lock.lock();try {logger.info("收到sentinel控制台规则写入信息,并准备持久化:{}", value);String convertResult = this.configEncoder.convert(value);redisTemplate.opsForValue().set(ruleUpdateKey, String.valueOf(System.currentTimeMillis()));redisTemplate.opsForValue().set(redisRuleKey, convertResult);logger.info("收到sentinel控制台规则写入信息,持久化后发布redis通知:{},信息:{}", this.redisFlowChanel, SENTINEL_RULE_CHANGE);redisTemplate.convertAndSend(this.redisFlowChanel, SENTINEL_RULE_CHANGE);} catch (Exception e) {logger.info("收到sentinel控制台规则写入信息,持久化出错:{}", e);throw e;} finally {this.lock.unlock();}}@Overridepublic void close() throws Exception {}
}
创建需要的配置类
package com.XX.sentinel;import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;import javax.annotation.PostConstruct;/*** @Author: codeAgent <br>* @CreateDate: 2023/07/04 16:12 <br>* @Description: sentinel在redis持久化相关配置*/
@Component
public class SentinelRedisHelper {@Value("${project.name}")private String projectName;private static String SENTINEL_REDIS_UPDATE_TIME = "%s:updateTime";private static SentinelRedisHelper self;@PostConstructpublic void init() {self = this;}/*** 获取sentinel中配置的项目名** @return*/public static String getProjectName() {return self.projectName;}/*** 获取redis对应规则更新时间的key** @param redisKey redis对应规则的key* @return*/public static String getRedisUpdateTimeKey(String redisKey) {return String.format(SENTINEL_REDIS_UPDATE_TIME, redisKey);}
}