Flink CEP(三)pattern动态更新

        线上运行的CEP中肯定经常遇到规则变更的情况,如果每次变更时都将任务重启、重新发布是非常不优雅的。尤其在营销或者风控这种对实时性要求比较高的场景,如果规则窗口过长(一两个星期),状态过大,就会导致重启时间延长,期间就会造成一些想要处理的异常行为不能及时发现。

1.实现分析

  • 外部加载:通常规则引擎会有专门的规则管理模块,提供用户去创建自己的规则,对于Flink任务来说需要到外部去加载规则
  • 动态更新:需要提供定时去检测规则是否变更
  • 历史状态清理:在模式匹配中是一系列NFAState 的不断变更,如果规则发生变更,需要清理历史状态
  • API:需要对外提供易用的API

2.代码实现

       首先实现一个用户API。

package cep.functions;import java.io.Serializable;import org.apache.flink.api.common.functions.Function;import cep.pattern.Pattern;/*** @author StephenYou* Created on 2023-07-23* Description: 动态Pattern接口(用户调用API)不区分key*/
public interface DynamicPatternFunction<T> extends Function, Serializable {/**** 初始化* @throws Exception*/public void init() throws Exception;/*** 注入新的pattern* @return*/public Pattern<T,T> inject() throws Exception;/*** 一个扫描周期:ms* @return*/public long getPeriod() throws Exception;/*** 规则是否发生变更* @return*/public boolean isChanged() throws Exception;
}

        希望上述API的调用方式如下。

//正常调用CEP.pattern(dataStream,pattern);//动态PatternCEP.injectionPattern(dataStream, new UserDynamicPatternFunction())

        所以需要修改CEP-Lib源码

        b.增加injectionPattern函数。

public class CEP {/**** Dynamic injection pattern function * @param input* @param dynamicPatternFunction* @return* @param <T>*/public static <T> PatternStream<T> injectionPattern throws Exception (DataStream<T> input,DynamicPatternFunction<T> dynamicPatternFunction){return new PatternStream<>(input, dynamicPatternFunction); }
}

        增加PatternStream构造函数,因为需要动态更新,所以有必要传进去整个函数。

public class PatternStream<T> {PatternStream(final DataStream<T> inputStream, DynamicPatternFunction<T> dynamicPatternFunction) throws Exception {this(PatternStreamBuilder.forStreamAndPatternFunction(inputStream, dynamicPatternFunction));}
}

        修改PatternStreamBuilder.build, 增加调用函数的过程。

        final CepOperator<IN, K, OUT> operator = null;if (patternFunction == null ) {operator = new CepOperator<>(inputSerializer,isProcessingTime,nfaFactory,comparator,pattern.getAfterMatchSkipStrategy(),processFunction,lateDataOutputTag);} else {operator = new CepOperator<>(inputSerializer,isProcessingTime,patternFunction,comparator,null,processFunction,lateDataOutputTag);}

        增加对应的CepOperator构造函数。

    public CepOperator(final TypeSerializer<IN> inputSerializer,final boolean isProcessingTime,final DynamicPatternFunction patternFunction,@Nullable final EventComparator<IN> comparator,@Nullable final AfterMatchSkipStrategy afterMatchSkipStrategy,final PatternProcessFunction<IN, OUT> function,@Nullable final OutputTag<IN> lateDataOutputTag) {super(function);this.inputSerializer = Preconditions.checkNotNull(inputSerializer);this.patternFunction = patternFunction;this.isProcessingTime = isProcessingTime;this.comparator = comparator;this.lateDataOutputTag = lateDataOutputTag;if (afterMatchSkipStrategy == null) {this.afterMatchSkipStrategy = AfterMatchSkipStrategy.noSkip();} else {this.afterMatchSkipStrategy = afterMatchSkipStrategy;}this.nfaFactory = null;}

        加载Pattern,构造NFA

    @Overridepublic void open() throws Exception {super.open();timerService =getInternalTimerService("watermark-callbacks", VoidNamespaceSerializer.INSTANCE, this);//初始化if (patternFunction != null) {patternFunction.init();Pattern pattern = patternFunction.inject();afterMatchSkipStrategy = pattern.getAfterMatchSkipStrategy();boolean timeoutHandling = getUserFunction() instanceof TimedOutPartialMatchHandler;nfaFactory = NFACompiler.compileFactory(pattern, timeoutHandling);long period = patternFunction.getPeriod();// 注册定时器检测规则是否变更if (period > 0) {getProcessingTimeService().registerTimer(timerService.currentProcessingTime() + period, this::onProcessingTime);}}nfa = nfaFactory.createNFA();nfa.open(cepRuntimeContext, new Configuration());context = new ContextFunctionImpl();collector = new TimestampedCollector<>(output);cepTimerService = new TimerServiceImpl();// metricsthis.numLateRecordsDropped = metrics.counter(LATE_ELEMENTS_DROPPED_METRIC_NAME);}

        状态清理一共分为两块: 匹配状态数据清理、定时器清理;

        进行状态清理:

    @Overridepublic void processElement(StreamRecord<IN> element) throws Exception {if (patternFunction != null) {// 规则版本更新if (needRefresh.value() < refreshVersion.get()) {//清除状态computationStates.clear();elementQueueState.clear();partialMatches.releaseCacheStatisticsTimer();//清除定时器Iterable<Long> registerTime = registerTimeState.get();if (registerTime != null) {Iterator<Long> iterator = registerTime.iterator();while (iterator.hasNext()) {Long l = iterator.next();//删除定时器timerService.deleteEventTimeTimer(VoidNamespace.INSTANCE, l);timerService.deleteProcessingTimeTimer(VoidNamespace.INSTANCE, l);//状态清理iterator.remove();}}//更新当前的版本needRefresh.update(refreshVersion.get());}}
}

        上面是在处理每条数据时,清除状态和版本。接下来要进行状态和版本的初始化。

    @Overridepublic void initializeState(StateInitializationContext context) throws Exception {super.initializeState(context);//初始化状态if (patternFunction != null) {/*** 两个标识位状态*/refreshFlagState = context.getOperatorStateStore().getUnionListState(new ListStateDescriptor<Integer>("refreshFlagState", Integer.class));if (context.isRestored()) {if (refreshFlagState.get().iterator().hasNext()) {refreshVersion = new AtomicInteger(refreshFlagState.get().iterator().next());}} else {refreshVersion = new AtomicInteger(0);}needRefresh = context.getKeyedStateStore().getState(new ValueStateDescriptor<Integer>("needRefreshState", Integer.class, 0));}
}

3.测试验证

        设置每10s变更一次Pattern。

 PatternStream patternStream = CEP.injectionPattern(source, new TestDynamicPatternFunction());patternStream.select(new PatternSelectFunction<Tuple3<String, Long, String>, Map>() {@Overridepublic Map select(Map map) throws Exception {map.put("processingTime", System.currentTimeMillis());return map;}}).print();env.execute("SyCep");}public static class TestDynamicPatternFunction implements DynamicPatternFunction<Tuple3<String, Long, String>> {public TestDynamicPatternFunction() {this.flag = true;}boolean flag;int time = 0;@Overridepublic void init() throws Exception {flag = true;}@Overridepublic Pattern<Tuple3<String, Long, String>, Tuple3<String, Long, String>> inject()throws Exception {// 2种patternif (flag) {Pattern pattern = Pattern.<Tuple3<String, Long, String>>begin("start").where(new IterativeCondition<Tuple3<String, Long, String>>() {@Overridepublic boolean filter(Tuple3<String, Long, String> value,Context<Tuple3<String, Long, String>> ctx) throws Exception {return value.f2.equals("success");}}).times(1).followedBy("middle").where(new IterativeCondition<Tuple3<String, Long, String>>() {@Overridepublic boolean filter(Tuple3<String, Long, String> value,Context<Tuple3<String, Long, String>> ctx) throws Exception {return value.f2.equals("fail");}}).times(1).next("end");return pattern;} else {Pattern pattern = Pattern.<Tuple3<String, Long, String>>begin("start2").where(new IterativeCondition<Tuple3<String, Long, String>>() {@Overridepublic boolean filter(Tuple3<String, Long, String> value,Context<Tuple3<String, Long, String>> ctx) throws Exception {return value.f2.equals("success2");}}).times(2).next("middle2").where(new IterativeCondition<Tuple3<String, Long, String>>() {@Overridepublic boolean filter(Tuple3<String, Long, String> value,Context<Tuple3<String, Long, String>> ctx) throws Exception {return value.f2.equals("fail2");}}).times(2).next("end2");return pattern;}}@Overridepublic long getPeriod() throws Exception {return 10000;}@Overridepublic boolean isChanged() throws Exception {flag = !flag ;time += getPeriod();System.out.println("change pattern : " + time);return true;}}

打印结果:符合预期

4.源码地址

感觉有用的话,帮忙点个小星星。^_^

 GitHub - StephenYou520/SyCep: CEP 动态Pattern

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/54131.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

AcWing 4310:树的DFS ← vector、auto、邻接表

【题目来源】https://www.acwing.com/problem/content/description/4313/【题目描述】 给定一棵 n 个节点的树。 节点的编号为 1∼n&#xff0c;其中 1 号节点为根节点&#xff0c;每个节点的编号都大于其父节点的编号。 现在&#xff0c;你需要回答 q 个询问。 每个询问给定两…

Python实现决策树算法:完整源码逐行解析

决策树是一种常用的机器学习算法&#xff0c;它可以用来解决分类和回归问题。决策树的优点是易于理解和解释&#xff0c;可以处理数值和类别数据&#xff0c;可以处理缺失值和异常值&#xff0c;可以进行特征选择和剪枝等操作。决策树的缺点是容易过拟合&#xff0c;对噪声和不…

C# 外观模式

概述 外观模式&#xff08;Facade Pattern&#xff09;是一种结构型设计模式&#xff0c;它提供了一个统一的接口&#xff0c;用于访问子系统中的一组接口。外观模式隐藏了子系统的复杂性&#xff0c;使得客户端可以通过简单的接口与子系统进行交互。 外观模式定义了一个高层…

mediasoup Lite ICE实现说明

目录 一. 前言 二. Lite ICE流程 三. STUN协议说明 STUN Header STUN Body 四. mediasoup Lite ICE实现源码剖析 一. 前言 ICE 是一种交互式建立连接的流程协议。ICE 有两种模式&#xff08;Full ICE 和 Lite ICE&#xff09;&#xff0c;Full ICE 要求建立连接的双方都要…

iOS——锁与死锁问题

iOS中的锁 什么是锁锁的分类互斥锁1. synchronized2. NSLock3. pthread 递归锁1. NSRecursiveLock2. pthread 信号量Semaphore1. dispatch_semaphore_t2. pthread 条件锁1. NSCodition2. NSCoditionLock3. POSIX Conditions 分布式锁NSDistributedLock 读写锁1. dispatch_barri…

AOF日志:宕机了,Redis如何避免数据丢失

当服务器宕机后&#xff0c;数据全部丢失&#xff1a;我们很容易想到的一个解决方案是从后端数据库恢复这些数据&#xff0c;但这种方式存在两个问题&#xff1a;一是&#xff0c;需要频繁访问数据库&#xff0c;会给数据库带来巨大的压力&#xff1b;二是&#xff0c;这些数据…

Rust 编程小技巧摘选(6)

目录 Rust 编程小技巧(6) 1. 打印字符串 2. 重复打印字串 3. 自定义函数 4. 遍历动态数组 5. 遍历二维数组 6. 同时遍历索引和值 7. 迭代器方法的区别 8. for_each() 用法 9. 分离奇数和偶数 10. 判断素数&#xff08;质数&#xff09; Rust 编程小技巧(6) 1. 打印…

剑指offer60.n个骰子的点数

这道题很简单&#xff0c;看完题目就会。看完题就会想到用动态规划的方法&#xff0c;如果我要用i个骰子拿到j个点数&#xff0c;那么我只能在i-1个骰子拿到j-1个点的情况下再用第i个骰子投出一个1&#xff0c;或者i-1个骰子拿到j-2个点的情况下再用第i个骰子投出一个2&#xf…

Unity学习参考文档和开发工具

☺ unity的官网文档&#xff1a;脚本 - Unity 手册 ■ 学习方式&#xff1a; 首先了解unity相关概述&#xff0c;快速认识unity编辑器&#xff0c;然后抓住重点的学&#xff1a;游戏对象、组件|C#脚本、预制体、UI ☺ 学习过程你会发现&#xff0c;其实Unity中主要是用c#进行开…

[Docker实现测试部署CI/CD----自由风格和流水线的CD操作(6)]

目录 12、自由风格的CD操作发布 V1.0.0 版本修改代码并推送GitLab 中项目打 Tag 发布 V2.0.0 版本Jenkins 配置 tag 参数添加 Git 参数添加 checkout 命令修改构建命令配置修改 SSH 配置 部署 v1.0.0重新构建工程构建结果 部署 v2.0.0重新构建工程访问 部署v3.0.0 13、流水线任…

微信小程序animation动画,微信小程序animation动画无限循环播放

需求是酱紫的&#xff1a; 页面顶部的喇叭通知&#xff0c;内容不固定&#xff0c;宽度不固定&#xff0c;就是做走马灯&#xff08;轮播&#xff09;效果&#xff0c;从左到右的走马灯&#xff08;轮播&#xff09;&#xff0c;每播放一遍暂停 1500ms &#xff5e; 2000ms 刚…

npm -v无法显示版本号

情况&#xff1a; 删除C盘下.npmrc文件后解决。路径 C:\Users\Dell 记录一下这个解法。