老生常谈——分布式限流:部分Sentinal源码解读

news/2024/12/21 14:20:30/文章来源:https://www.cnblogs.com/xy1997/p/18620730
  1. 基础知识

HTTP CODE = 429 “请求过多”

A. 限流的类型

  • 服务端

  • 客户端

限流的标的

  • IP

  • 用户

  • ...

基本要求

  • 准确限制过量的请求。

  • 低延时。限流器不能拖慢HTTP响应时间。

  • 尽量占用较少的内存。

  • 这是一个分布式限流器,可以在多个服务器或者进程之间共享。

  • 需要处理异常。当用户的请求被拦截时,给用户展示明确的异常信息。

  • 高容错性。如果限流器出现任何问题(比如某个缓存服务器宕机),不能影响整个系统。

  1. 限流算法

A. 漏桶算法(Leaking Bucket)

基本原理

当一个请求到达时,系统先检查桶是否已满。如果没有,就将请求添加到队列中。否则,丢弃请求。定期从队列中取出请求并进行处理。(控制消费的速率)

B. 代币桶算法(Token Bucket)

基本原理

代币桶是一个有预定义容量的容器。代币按照预定的速率被放入桶中。一旦桶被装满,就不再往里面添加代币。如如果桶满了,多出来的代币就会溢出。(控制进入的速率)

Sentinal实现
// 这个类及其相关的类是近一年才被加入到Sentinal中的,在主流程中未看到直接的调用class TokenBucket {private final long maxTokens;private final long intervalMillis;private volatile long nextUpdate;private AtomicLong tokens;public TokenBucket(long maxTokens, long intervalMillis) {if (maxTokens <= 0) {throw new IllegalArgumentException("maxTokens should > 0, but given: " + maxTokens);}if (intervalMillis < 1000) {throw new IllegalArgumentException("intervalMillis should be at least 1000, but given: " + intervalMillis);}this.maxTokens = maxTokens;this.intervalMillis = intervalMillis;this.nextUpdate = System.currentTimeMillis() / 1000 * 1000 + intervalMillis;//第一次全量注入tokenthis.tokens = new AtomicLong(maxTokens);}public boolean accept(long now) {long currTokens;// 到期自动全量注入tokenif (now > nextUpdate) {currTokens = tokens.get();if (tokens.compareAndSet(currTokens, maxTokens)) {nextUpdate = System.currentTimeMillis() / 1000 * 1000 + intervalMillis;}}// 尝试获取tokendo {currTokens = tokens.get();} while (currTokens > 0 && !tokens.compareAndSet(currTokens, currTokens - 1));return currTokens > 0;}
}
优点

算法容易实现。

内存的使用效率高。允许在很短时间内出现突发流量。

只要还有代币,请求就可以通过。

C. 固定窗口计数器算法(Fixed Window Counter)

顾名思义,问题在于,请求如果集中在某个窗口的两侧则可能会出现溢出拒绝。例如每分钟限流5个请求,窗口边界从00->59,则若第01s进入5个请求,第58s进入5个请求,后五个请求将被拒绝。

D. 滑动窗口(LeapArray - Sentinal)

LeapArray提供窗口操作的核心API,结合不同的Bucket(提供统计能力)泛型实现可以演变出不同的子类

// com.alibaba.csp.sentinel.slots.statistic.base.LeapArray -> 提供了窗口相关的核心apipublic abstract class LeapArray<T> {protected int windowLengthInMs;protected int sampleCount;protected int intervalInMs;private double intervalInSecond;protected final AtomicReferenceArray<WindowWrap<T>> array;/*** The conditional (predicate) update lock is used only when current bucket is deprecated.*/private final ReentrantLock updateLock = new ReentrantLock();/*** The total bucket count is: {@code sampleCount = intervalInMs / windowLengthInMs}.** @param sampleCount  bucket count of the sliding window* @param intervalInMs the total time interval of this {@link LeapArray} in milliseconds*/public LeapArray(int sampleCount, int intervalInMs) {AssertUtil.isTrue(sampleCount > 0, "bucket count is invalid: " + sampleCount);AssertUtil.isTrue(intervalInMs > 0, "total time interval of the sliding window should be positive");AssertUtil.isTrue(intervalInMs % sampleCount == 0, "time span needs to be evenly divided");this.windowLengthInMs = intervalInMs / sampleCount;this.intervalInMs = intervalInMs;this.intervalInSecond = intervalInMs / 1000.0;this.sampleCount = sampleCount;this.array = new AtomicReferenceArray<>(sampleCount);}/*** Get the bucket at current timestamp.** @return the bucket at current timestamp*/public WindowWrap<T> currentWindow() {return currentWindow(TimeUtil.currentTimeMillis());}/*** Create a new statistic value for bucket.** @param timeMillis current time in milliseconds* @return the new empty bucket*/public abstract T newEmptyBucket(long timeMillis);/*** Reset given bucket to provided start time and reset the value.** @param startTime  the start time of the bucket in milliseconds* @param windowWrap current bucket* @return new clean bucket at given start time*/protected abstract WindowWrap<T> resetWindowTo(WindowWrap<T> windowWrap, long startTime);private int calculateTimeIdx(/*@Valid*/ long timeMillis) {long timeId = timeMillis / windowLengthInMs;// Calculate current index so we can map the timestamp to the leap array.return (int)(timeId % array.length());}protected long calculateWindowStart(/*@Valid*/ long timeMillis) {return timeMillis - timeMillis % windowLengthInMs;}/*** Get bucket item at provided timestamp.** @param timeMillis a valid timestamp in milliseconds* @return current bucket item at provided timestamp if the time is valid; null if time is invalid*/public WindowWrap<T> currentWindow(long timeMillis) {if (timeMillis < 0) {return null;}int idx = calculateTimeIdx(timeMillis);// Calculate current bucket start time.long windowStart = calculateWindowStart(timeMillis);/** Get bucket item at given time from the array.** (1) Bucket is absent, then just create a new bucket and CAS update to circular array.* (2) Bucket is up-to-date, then just return the bucket.* (3) Bucket is deprecated, then reset current bucket.*/while (true) {WindowWrap<T> old = array.get(idx);if (old == null) {/**     B0       B1      B2    NULL      B4* ||_______|_______|_______|_______|_______||___* 200     400     600     800     1000    1200  timestamp*                             ^*                          time=888*            bucket is empty, so create new and update** If the old bucket is absent, then we create a new bucket at {@code windowStart},* then try to update circular array via a CAS operation. Only one thread can* succeed to update, while other threads yield its time slice.*/WindowWrap<T> window = new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));if (array.compareAndSet(idx, null, window)) {// Successfully updated, return the created bucket.return window;} else {// Contention failed, the thread will yield its time slice to wait for bucket available.Thread.yield();}} else if (windowStart == old.windowStart()) {/**     B0       B1      B2     B3      B4* ||_______|_______|_______|_______|_______||___* 200     400     600     800     1000    1200  timestamp*                             ^*                          time=888*            startTime of Bucket 3: 800, so it's up-to-date** If current {@code windowStart} is equal to the start timestamp of old bucket,* that means the time is within the bucket, so directly return the bucket.*/return old;} else if (windowStart > old.windowStart()) {/**   (old)*             B0       B1      B2    NULL      B4* |_______||_______|_______|_______|_______|_______||___* ...    1200     1400    1600    1800    2000    2200  timestamp*                              ^*                           time=1676*          startTime of Bucket 2: 400, deprecated, should be reset** If the start timestamp of old bucket is behind provided time, that means* the bucket is deprecated. We have to reset the bucket to current {@code windowStart}.* Note that the reset and clean-up operations are hard to be atomic,* so we need a update lock to guarantee the correctness of bucket update.** The update lock is conditional (tiny scope) and will take effect only when* bucket is deprecated, so in most cases it won't lead to performance loss.*/if (updateLock.tryLock()) {try {// Successfully get the update lock, now we reset the bucket.return resetWindowTo(old, windowStart);} finally {updateLock.unlock();}} else {// Contention failed, the thread will yield its time slice to wait for bucket available.Thread.yield();}} else if (windowStart < old.windowStart()) {// Should not go through here, as the provided time is already behind.return new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));}}}
}
  1. 限流计数与规则

限流计数

  • 单机:本地LoadingCache等支持过期时间的表

  • 分布式:redis

    • 参见:Better Rate Limiting With Redis Sorted Sets,利用ZSET来避免竞态条件下,系统性能被锁瓶颈影响。(简单来说就是把单变量的get -> add 1 的操作变成 add时间戳,通过count时间戳的数量来确认是否限流)

限流策略

  • 拒绝

  • 放入消息队列,后续消费

  1. Sentinal源码分析

核心抽象

  • 资源

  • 规则 -> FlowRule.class

  • 流控、降级、热点、授权

请求是如何被统计的?

调用链
com.alibaba.csp.sentinel.CtSph#entry(java.lang.String)// 业务侧的起点,会初始化出调用链,并且Context类的资源是保存在ThreadLocal中的,// 也就是再一次调用中复用的同一个 Context,因此可以通过Context的来自动构建Node树com.alibaba.csp.sentinel.slotchain.DefaultProcessorSlotChain#entry// ProcessorSlot的调用链,顺序依照public static final int ORDER_NODE_SELECTOR_SLOT = -10000;public static final int ORDER_CLUSTER_BUILDER_SLOT = -9000;public static final int ORDER_LOG_SLOT = -8000;public static final int ORDER_STATISTIC_SLOT = -7000;public static final int ORDER_AUTHORITY_SLOT = -6000;public static final int ORDER_SYSTEM_SLOT = -5000;public static final int ORDER_FLOW_SLOT = -2000;public static final int ORDER_DEFAULT_CIRCUIT_BREAKER_SLOT = -1500;com.alibaba.csp.sentinel.slots.nodeselector.NodeSelectorSlot#entry// 会尝试初始化这次调用的Node类,Node类用于统计请求指标public void entry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized, Object... args)throws Throwable {/** It's interesting that we use context name rather resource name as the map key.** Remember that same resource({@link ResourceWrapper#equals(Object)}) will share* the same {@link ProcessorSlotChain} globally, no matter in which context. So if* code goes into {@link #entry(Context, ResourceWrapper, DefaultNode, int, Object...)},* the resource name must be same but context name may not.** If we use {@link com.alibaba.csp.sentinel.SphU#entry(String resource)} to* enter same resource in different context, using context name as map key can* distinguish the same resource. In this case, multiple {@link DefaultNode}s will be created* of the same resource name, for every distinct context (different context name) each.** Consider another question. One resource may have multiple {@link DefaultNode},* so what is the fastest way to get total statistics of the same resource?* The answer is all {@link DefaultNode}s with same resource name share one* {@link ClusterNode}. See {@link ClusterBuilderSlot} for detail.*/DefaultNode node = map.get(context.getName());if (node == null) {synchronized (this) {node = map.get(context.getName());if (node == null) {node = new DefaultNode(resourceWrapper, null);// DefaultNode具有统计功能HashMap<String, DefaultNode> cacheMap = new HashMap<String, DefaultNode>(map.size());cacheMap.putAll(map);cacheMap.put(context.getName(), node);map = cacheMap;// Build invocation tree// 最上层的调用发起处在此处将会添加到entranceNode的child字段中,// 非最上层的调用发起处则会添加到curNode的child字段中((DefaultNode) context.getLastNode()).addChild(node);}}}context.setCurNode(node);fireEntry(context, resourceWrapper, node, count, prioritized, args);}com.alibaba.csp.sentinel.slots.statistic.StatisticSlot#entry// 而后会通过这个slow进行统计, 这也就是为什么最先执行NodeSelectorSlot,然后再是StatisticSlot// 最后再是一些业务规则public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,boolean prioritized, Object... args) throws Throwable {try {// Do some checking.fireEntry(context, resourceWrapper, node, count, prioritized, args);// Request passed, add thread count and pass count.node.increaseThreadNum();node.addPassRequest(count);if (context.getCurEntry().getOriginNode() != null) {// Add count for origin node.context.getCurEntry().getOriginNode().increaseThreadNum();context.getCurEntry().getOriginNode().addPassRequest(count);}if (resourceWrapper.getEntryType() == EntryType.IN) {// Add count for global inbound entry node for global statistics.Constants.ENTRY_NODE.increaseThreadNum();Constants.ENTRY_NODE.addPassRequest(count);}// Handle pass event with registered entry callback handlers.for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {handler.onPass(context, resourceWrapper, node, count, args);}} catch (PriorityWaitException ex) {node.increaseThreadNum();if (context.getCurEntry().getOriginNode() != null) {// Add count for origin node.context.getCurEntry().getOriginNode().increaseThreadNum();}if (resourceWrapper.getEntryType() == EntryType.IN) {// Add count for global inbound entry node for global statistics.Constants.ENTRY_NODE.increaseThreadNum();}// Handle pass event with registered entry callback handlers.for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {handler.onPass(context, resourceWrapper, node, count, args);}} catch (BlockException e) {// Blocked, set block exception to current entry.context.getCurEntry().setBlockError(e);// Add block count.node.increaseBlockQps(count);if (context.getCurEntry().getOriginNode() != null) {context.getCurEntry().getOriginNode().increaseBlockQps(count);}if (resourceWrapper.getEntryType() == EntryType.IN) {// Add count for global inbound entry node for global statistics.Constants.ENTRY_NODE.increaseBlockQps(count);}// Handle block event with registered entry callback handlers.for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {handler.onBlocked(e, context, resourceWrapper, node, count, args);}throw e;} catch (Throwable e) {// Unexpected internal error, set error to current entry.context.getCurEntry().setError(e);throw e;}}

限流规则是如何被apply的?

核心责任链处理器接口
public interface ProcessorSlot<T> {/*** Entrance of this slot.** @param context         current {@link Context}* @param resourceWrapper current resource* @param param           generics parameter, usually is a {@link com.alibaba.csp.sentinel.node.Node}* @param count           tokens needed* @param prioritized     whether the entry is prioritized* @param args            parameters of the original call* @throws Throwable blocked exception or unexpected error*/void entry(Context context, ResourceWrapper resourceWrapper, T param, int count, boolean prioritized,Object... args) throws Throwable;/*** Means finish of {@link #entry(Context, ResourceWrapper, Object, int, boolean, Object...)}.** @param context         current {@link Context}* @param resourceWrapper current resource* @param obj             relevant object (e.g. Node)* @param count           tokens needed* @param prioritized     whether the entry is prioritized* @param args            parameters of the original call* @throws Throwable blocked exception or unexpected error*/void fireEntry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized,Object... args) throws Throwable;/*** Exit of this slot.** @param context         current {@link Context}* @param resourceWrapper current resource* @param count           tokens needed* @param args            parameters of the original call*/void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args);/*** Means finish of {@link #exit(Context, ResourceWrapper, int, Object...)}.** @param context         current {@link Context}* @param resourceWrapper current resource* @param count           tokens needed* @param args            parameters of the original call*/void fireExit(Context context, ResourceWrapper resourceWrapper, int count, Object... args);
}

// 责任链的处理顺序 授权规则 -> 系统规则 -> 热点规则 ->  流量控制规则 -> 降级规则 
/*** Order of default processor slots*/
public static final int ORDER_NODE_SELECTOR_SLOT = -10000;
public static final int ORDER_CLUSTER_BUILDER_SLOT = -9000;
public static final int ORDER_LOG_SLOT = -8000;
public static final int ORDER_STATISTIC_SLOT = -7000;
public static final int ORDER_AUTHORITY_SLOT = -6000;
public static final int ORDER_SYSTEM_SLOT = -5000;
public static final int ORDER_FLOW_SLOT = -2000;
public static final int ORDER_DEFAULT_CIRCUIT_BREAKER_SLOT = -1500;
public static final int ORDER_DEGRADE_SLOT = -1000;

整体设计上是通过实现不同的XXXSlot类(提供一些模板性质的方法),并为每个类关联不同的XXXRule来实现级联的规则校验。例如在热点控制这个Slot中,可以为某个特定的Api资源配置多个不同的热点参数值,这些不同的热点参数就会具象为不同的ParamFlowRule

FlowSlot
@Spi(order = Constants.ORDER_FLOW_SLOT)
public class FlowSlot extends AbstractLinkedProcessorSlot<DefaultNode> {private final FlowRuleChecker checker;public FlowSlot() {this(new FlowRuleChecker());}/*** Package-private for test.** @param checker flow rule checker* @since 1.6.1*/FlowSlot(FlowRuleChecker checker) {AssertUtil.notNull(checker, "flow checker should not be null");this.checker = checker;}@Overridepublic void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,boolean prioritized, Object... args) throws Throwable {checkFlow(resourceWrapper, context, node, count, prioritized);fireEntry(context, resourceWrapper, node, count, prioritized, args);}void checkFlow(ResourceWrapper resource, Context context, DefaultNode node, int count, boolean prioritized)throws BlockException {checker.checkFlow(ruleProvider, resource, context, node, count, prioritized);}@Overridepublic void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {fireExit(context, resourceWrapper, count, args);}private final Function<String, Collection<FlowRule>> ruleProvider = new Function<String, Collection<FlowRule>>() {@Overridepublic Collection<FlowRule> apply(String resource) {return FlowRuleManager.getFlowRules(resource);}};
}

可以看到核心的规则逻辑是体现在FlowRule中的,通过FlowRuleManager获取到。校验的时候提供了两种机制,集群和本地

// FlowRuleChecker.classpublic boolean canPassCheck(/*@NonNull*/ FlowRule rule, Context context, DefaultNode node, int acquireCount,boolean prioritized) {String limitApp = rule.getLimitApp();if (limitApp == null) {return true;}if (rule.isClusterMode()) {return passClusterCheck(rule, context, node, acquireCount, prioritized);}return passLocalCheck(rule, context, node, acquireCount, prioritized);
}

rater主要有以上四种实现,底层基本是依赖滑动窗口算法

Why?

  • 滑动窗口可以更方便的列出一些统计信息,从而进行额外的限流功能的扩展

  • 令牌桶算法还需要维护token的注入速度,并且只能for限流使用。而在其他场景,还是需要滑动窗口相关的数据结构来统计一些系统指标

DefualtController
// 依赖defaultNode中统计出的qps数,本质是滑动窗口统计的结果
@Override
public boolean canPass(Node node, int acquireCount, boolean prioritized) {int curCount = avgUsedTokens(node);if (curCount + acquireCount > count) {if (prioritized && grade == RuleConstant.FLOW_GRADE_QPS) {long currentTime;long waitInMs;currentTime = TimeUtil.currentTimeMillis();waitInMs = node.tryOccupyNext(currentTime, acquireCount, count);if (waitInMs < OccupyTimeoutProperty.getOccupyTimeout()) {node.addWaitingRequest(currentTime + waitInMs, acquireCount);node.addOccupiedPass(acquireCount);sleep(waitInMs);// PriorityWaitException indicates that the request will pass after waiting for {@link @waitInMs}.throw new PriorityWaitException(waitInMs);}}return false;}return true;
}
WarmUpLimiter
 @Overridepublic boolean canPass(Node node, int acquireCount, boolean prioritized) {// 1. 获取前一个统计周期的 QPSlong previousQps = (long) node.previousPassQps();// 同步令牌,基于之前的 QPS 更新当前存储的令牌数syncToken(previousQps);long currentTime = TimeUtil.currentTimeMillis();long restToken = storedTokens.get();  // 获取当前存储的令牌数long costTime = 0;  // 计算本次请求需要的时间long expectedTime = 0;  // 预期执行时间// 2. 令牌数超过警戒值,说明系统处于预热阶段if (restToken >= warningToken) {// 计算超出警戒值的令牌数long aboveToken = restToken - warningToken;// 根据斜率计算预热期间的 QPS// warmingQps 会随着 restToken 的减少而增加double warmingQps = Math.nextUp(1.0 / (aboveToken * slope + 1.0 / count));// 计算按照预热 QPS 处理请求需要的时间costTime = Math.round(1.0 * (acquireCount) / warmingQps * 1000);} else {// 3. 令牌数低于警戒值,按照目标 QPS 处理costTime = Math.round(1.0 * (acquireCount) / count * 1000);}// 4. 计算预期完成时间expectedTime = costTime + latestPassedTime.get();// 5. 如果预期完成时间小于当前时间,说明可以立即处理if (expectedTime <= currentTime) {latestPassedTime.set(currentTime);return true;} else {// 6. 需要等待的情况long waitTime = costTime + latestPassedTime.get() - currentTime;// 等待时间超过最大超时时间,直接拒绝if (waitTime > timeoutInMs) {return false;} else {// 7. 更新最新通过时间并等待long oldTime = latestPassedTime.addAndGet(costTime);try {waitTime = oldTime - TimeUtil.currentTimeMillis();if (waitTime > timeoutInMs) {latestPassedTime.addAndGet(-costTime);return false;}if (waitTime > 0) {Thread.sleep(waitTime);}return true;} catch (InterruptedException e) {}}}return false;}
} 
ParamFlowSlot

主要的作用是可以根据一些请求的参数值,进行限流,比如促销时一些热点的品类

// com.alibaba.csp.sentinel.slots.block.flow.param.ParamFlowSlot#checkFlowvoid checkFlow(ResourceWrapper resourceWrapper, int count, Object... args) throws BlockException {if (args == null) {return;}if (!ParamFlowRuleManager.hasRules(resourceWrapper.getName())) {return;}List<ParamFlowRule> rules = ParamFlowRuleManager.getRulesOfResource(resourceWrapper.getName());for (ParamFlowRule rule : rules) {applyRealParamIdx(rule, args.length);// Initialize the parameter metrics.ParameterMetricStorage.initParamMetricsFor(resourceWrapper, rule);if (!ParamFlowChecker.passCheck(resourceWrapper, rule, count, args)) {String triggeredParam = "";if (args.length > rule.getParamIdx()) {Object value = args[rule.getParamIdx()];// Assign actual value with the result of paramFlowKey methodif (value instanceof ParamFlowArgument) {value = ((ParamFlowArgument) value).paramFlowKey();}triggeredParam = String.valueOf(value);}throw new ParamFlowException(resourceWrapper.getName(), triggeredParam, rule);}}
}//集群模式,使用本地模式作为兜底策略。集群模式依赖中心化的限流服务,SDK中也提供了嵌入的服务端服务
private static boolean passClusterCheck(FlowRule rule, Context context, DefaultNode node, int acquireCount,boolean prioritized) {try {TokenService clusterService = pickClusterService();if (clusterService == null) {return fallbackToLocalOrPass(rule, context, node, acquireCount, prioritized);}long flowId = rule.getClusterConfig().getFlowId();TokenResult result = clusterService.requestToken(flowId, acquireCount, prioritized);return applyTokenResult(result, rule, context, node, acquireCount, prioritized);// If client is absent, then fallback to local mode.} catch (Throwable ex) {RecordLog.warn("[FlowRuleChecker] Request cluster token unexpected failed", ex);}// Fallback to local flow control when token client or server for this rule is not available.// If fallback is not enabled, then directly pass.return fallbackToLocalOrPass(rule, context, node, acquireCount, prioritized);
}// 本地模式则直接使用SDK中的类进行校验,核心逻辑在Rater字段上,rater是一系列实现了com.alibaba.csp.sentinel.slots.block.flow.TrafficShapingController
// 接口的类
private static boolean passLocalCheck(FlowRule rule, Context context, DefaultNode node, int acquireCount,boolean prioritized) {Node selectedNode = selectNodeByRequesterAndStrategy(rule, context, node);if (selectedNode == null) {return true;}return rule.getRater().canPass(selectedNode, acquireCount, prioritized);
}
SystemSlot

主要是check一些系统层面的宏观统计指标,如全局的qps,cpu利用率,load,线程数等等

@Spi(order = Constants.ORDER_SYSTEM_SLOT)
public class SystemSlot extends AbstractLinkedProcessorSlot<DefaultNode> {@Overridepublic void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,boolean prioritized, Object... args) throws Throwable {SystemRuleManager.checkSystem(resourceWrapper, count);fireEntry(context, resourceWrapper, node, count, prioritized, args);}@Overridepublic void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {fireExit(context, resourceWrapper, count, args);}}// SystemRuleManager.checkSystem 中的一部端核心逻辑
public static void checkSystem(ResourceWrapper resourceWrapper, int count) throws BlockException {if (resourceWrapper == null) {return;}// Ensure the checking switch is on.if (!checkSystemStatus.get()) {return;}// for inbound traffic onlyif (resourceWrapper.getEntryType() != EntryType.IN) {return;}// total qpsdouble currentQps = Constants.ENTRY_NODE.passQps();if (currentQps + count > qps) {throw new SystemBlockException(resourceWrapper.getName(), "qps");}// total threadint currentThread = Constants.ENTRY_NODE.curThreadNum();if (currentThread > maxThread) {throw new SystemBlockException(resourceWrapper.getName(), "thread");}double rt = Constants.ENTRY_NODE.avgRt();if (rt > maxRt) {throw new SystemBlockException(resourceWrapper.getName(), "rt");}// load. BBR algorithm.if (highestSystemLoadIsSet && getCurrentSystemAvgLoad() > highestSystemLoad) {if (!checkBbr(currentThread)) {throw new SystemBlockException(resourceWrapper.getName(), "load");}}// cpu usageif (highestCpuUsageIsSet && getCurrentCpuUsage() > highestCpuUsage) {throw new SystemBlockException(resourceWrapper.getName(), "cpu");}
}
AuthoritySlot

主要是一些白名单和黑名单的逻辑

// 逻辑也非常简单,直接获取具体的auth规则去校验即可
@Spi(order = Constants.ORDER_AUTHORITY_SLOT)
public class AuthoritySlot extends AbstractLinkedProcessorSlot<DefaultNode> {@Overridepublic void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args)throws Throwable {checkBlackWhiteAuthority(resourceWrapper, context);fireEntry(context, resourceWrapper, node, count, prioritized, args);}@Overridepublic void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {fireExit(context, resourceWrapper, count, args);}void checkBlackWhiteAuthority(ResourceWrapper resource, Context context) throws AuthorityException {List<AuthorityRule> rules = AuthorityRuleManager.getRules(resource.getName());if (rules == null) {return;}for (AuthorityRule rule : rules) {if (!AuthorityRuleChecker.passCheck(rule, context)) {throw new AuthorityException(context.getOrigin(), rule);}}}
}
CircuitBreakerSlot

和熔断降级相关的职责,围绕状态机OPEN, HALF_OPEN和CLOSE展开

@Spi(order = Constants.ORDER_DEFAULT_CIRCUIT_BREAKER_SLOT)
public class DefaultCircuitBreakerSlot extends AbstractLinkedProcessorSlot<DefaultNode> {@Overridepublic void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,boolean prioritized, Object... args) throws Throwable {performChecking(context, resourceWrapper);fireEntry(context, resourceWrapper, node, count, prioritized, args);}private void performChecking(Context context, ResourceWrapper r) throws BlockException {// If user has set a degrade rule for the resource, the default rule will not be activatedif (DegradeRuleManager.hasConfig(r.getName())) {return;}List<CircuitBreaker> circuitBreakers = DefaultCircuitBreakerRuleManager.getDefaultCircuitBreakers(r.getName());if (circuitBreakers == null || circuitBreakers.isEmpty()) {return;}for (CircuitBreaker cb : circuitBreakers) {if (!cb.tryPass(context)) {throw new DegradeException(cb.getRule().getLimitApp(), cb.getRule());}}}@Overridepublic void exit(Context context, ResourceWrapper r, int count, Object... args) {Entry curEntry = context.getCurEntry();if (curEntry.getBlockError() != null) {fireExit(context, r, count, args);return;}if (DegradeRuleManager.hasConfig(r.getName())) {fireExit(context, r, count, args);return;}List<CircuitBreaker> circuitBreakers = DefaultCircuitBreakerRuleManager.getDefaultCircuitBreakers(r.getName());if (circuitBreakers == null || circuitBreakers.isEmpty()) {fireExit(context, r, count, args);return;}if (curEntry.getBlockError() == null) {// passed requestfor (CircuitBreaker circuitBreaker : circuitBreakers) {circuitBreaker.onRequestComplete(context);}}fireExit(context, r, count, args);}
}// OPEN 代表启用熔断, HALF_OPEN时如果抛出异常则重新回到OPEN,否则变为CLOSE,CLOSE代表不启用熔断规则

参考引用

  1. 搞定系统设计:面试敲开大厂的门

  2. Sentinal源码

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/856351.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

可扩展系统——基于SPI扩展

一、我们为什么讨论SPI? 为具有悠久历史的大型项目(屎山)添加新功能时,我们常常不太好评估变更的影响范围。因为原系统不具备良好的扩展性,导致修改整体发散,且不易单测。此时可以考虑使用接口来描述业务逻辑较为稳定的流程,并使用SPI机制来灵活的隔离加载实际的实现,来…

大模型--采样技术 TopK TopP 惩罚系数--37

目录1. 参考2. 概述重复惩罚(Repetition Penalty) 1. 参考 https://mp.weixin.qq.com/s/mBZA6PaMotJw7WeVdA359g 2. 概述 大型语言模型(LLMs)通过“根据上下文预测下一个 token 的概率分布”来生成文本。最简单的采样方法是贪心采样(Greedy Sampling),它在每一步选择概率…

关于分布式锁的的思考

关于分布式锁的的思考 结论先行: 对于分布式锁我们在考虑不同方案的时候需要先思考需要的效果是什么?为了效率(efficiency),协调各个客户端避免做重复的工作。即使锁偶尔失效了,只是可能把某些操作多做一遍而已,不会产生其它的不良后果。比如重复发送了一封同样的 email(…

2024-12-21:从魔法师身上吸取的最大能量。用go语言,在一个神秘的地牢里,有 n 名魔法师排成一列。每位魔法师都有一个能量属性,有的提供正能量,而有的则会消耗你的能量。 你被施加了一种诅咒,吸

2024-12-21:从魔法师身上吸取的最大能量。用go语言,在一个神秘的地牢里,有 n 名魔法师排成一列。每位魔法师都有一个能量属性,有的提供正能量,而有的则会消耗你的能量。 你被施加了一种诅咒,吸收来自第 i 位魔法师的能量后,你会立即被传送到第 (i + k) 位魔法师。在这个…

平替兼容MFRC523|国产13.56MHz智能门锁NFC读卡芯片KYN523

NFC是一种非接触式识别和互联技术,可以在移动设备、消费类电子产品等设备间进行近距离无线通信。通过 NFC 可实现数据传输、移动支付等功能。 KYN523是一款高度集成的工作在 13.56MHZ 下的非接触读写器芯片,支持符合ISO/IEC 14443 TypeA、ISO/IEC 14443 TypeB 协议的非接触读…

redis-cli (error) NOAUTH Authentication required问题解决

1.查找redis-cli所在目录 which redis-cli2.切换到redis-cli目录3.切换到usr/bin 目录 执行以下命令redis-cli -h ip -p port 4. 验证redis登录密码 auth password5.获取redis数据

快速幂优化高精度乘法

NOI 1.6 12 题目描述题目给出的 \(n\) 最大可以取到 \(100\) ,即计算 \(2^{100}\) ,明显是超过 long long 的 \(2^{63}-1\),所以需要使用高精度来计算幂次方的乘法简单的高精度,即每次计算一个小整数乘上一个大整数循环 \(n\) 次,每次对上一次的积都乘以 \(2\) vector<…

Docker网络基础知识

Docker 网络是 Docker 容器之间以及容器与主机或其他网络资源之间通信的基础。Docker网络基础1.默认网络当你启动一个容器是,如果没有特别指定网络,它会自动连接到Docker的默认桥接网络(bridge network)。 这个默认的桥接网络通常被称为bridge,它允许容器之间通过IP地址相…

川土微代理商深圳|CA-IS3740,CA-IS3741,CA-IS3742高性能四通道数字隔离芯片

CA-IS3740,CA-IS3741,CA-IS3742产品特性 •信号传输速率:DCto150Mbps •宽电源电压范围:2.5Vto5.5V •宽温度范围:‐40Cto125C •无需启动初始化 •默认输出高电平和低电平选项 •优异的电磁抗扰度 •高CMTI:150kV/s(典型值) •低功耗,(典型值): ▪电流为1.5mA/通道(@5…

大学8086汇编debug——关于int3断点之后继续调试的方法

预先 在汇编中打入int 3,然后在debug中利用G,就可以一路运行到断点处。 正文 在断点上可以用U来查看上下代码的位置断点后运行 然后用-g=xxxx:xxxx可以运行到下一个断点,或是直接运行至结束 还可以用-t=xxxx:xxxx逐步运行 注意:xxxx:xxxx是int 3下一个命令的地址

sunny替换响应体

本文来自博客园,作者:__username,转载请注明原文链接:https://www.cnblogs.com/code3/p/18620658