【高级应用】Flink Cep模式匹配

什么是Cep?

在流式数据中(事件流),筛选出符合条件的一系列动作(事件)【复杂事件处理

什么是 Flink-Cep?

Flink Cep库Api实时操作

官方文档

什么是Pattern?

Pattern就是Cep里的规则制定

Pattern分为个体模式组合模式(模式序列)模式组

模式组是将组合模式作为条件个体模式

Cep开发流程

  1. DataStream 或 Keyedstream
  2. 定义规则(Pattern)
  3. 规则应用于KeyedStream,生成PatternStream
  4. PatternStream,通过Select方法,将符合规则的数据输出

代码实战

依赖

        <!-- Flink-Cep --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-cep_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency>

image-20240109153616392

Cep开发伪代码(个体模式和组合模式)

public class CepDemo {public static void main(String[] args) {// 创建流式计算上下文环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 生成DataStreamDataStream<String> dataStream = null;// 生成KeyedStream (分组)KeyedStream<String, Tuple> keyedStream = dataStream.keyBy("");// 生成模式(规则) ( Pattern 对象)/* ************************ 【个体模式】* 1. 【单例】模式:只接收1个事件* 2. 【循环】模式:能接收多个事件或1个事件, 单例模式 + 量词(times())* *********************/// 生成名叫 “login” 的单个PatternPattern<String, String> pattern =Pattern.<String>begin("login").where(new SimpleCondition<String>() {@Overridepublic boolean filter(String s) throws Exception {// Patter规则内容return false;}}).times(3);/* *********************** 【组合模式】** 组合方式:* 1. next: 严格紧邻 (连续)* 2. fallowedBy: 宽松近邻 (非连续)* 3. fallowedByAny: 非严格匹配,比 fallowedBy 更宽松** *********************/// 生成了两个Patten所组成的Pattern序列,分别名叫 "login", "sale"Pattern<String, String> patterns =Pattern.<String>begin("login")//.where().followedBy("sale");//.where();// 将 Pattern 应用于 KeyedStream, 生成 PatternStream 对象PatternStream<String> patternStream = CEP.pattern(keyedStream, patterns);// 通过PatternStream 对象的 select() 方法, 将符合规则的数据提取输出DataStream<Object> patternResult = patternStream.select(new PatternSelectFunction<String, Object>() {/*** @param map:  key: 指的是Pattern的名称。 value: 符合这个Pattern的数据*/@Overridepublic Object select(Map<String, List<String>> map) throws Exception {return null;}});}
}

【生成模式】

基于【个体模式】检测最近1分钟内登录失败超过3次的用户

CEP模式:允许这3次登录失败事件之间出现其他行为事件(不连续)【宽松近邻】

public class LoginFailBySingleton {public static void main(String[] args) {// Kafka数据源DataStream<EventPO> eventStream = KafkaUtil.read(args);// 生成KeyedStream 用户id分组KeyedStream<EventPO, Integer> keyedStream = eventStream.keyBy((KeySelector<EventPO, Integer>) EventPO::getUser_id_int);// 生成模式 (规则/Pattern)Pattern.<EventPO>begin("login_fail_first") // Pattern名称/*1. IterativeCondition 抽象类 表示通用的匹配规则需要实现 filter(), 需要传入2个参数2.SimpleCondition 是 IterativeCondition 的子类,表示简单的匹配规则需要实现 filter(), 需要传入1个参数*/.where(new SimpleCondition<EventPO>() {@Overridepublic boolean filter(EventPO eventPO) {// 登录失败事件return EventConstant.LOGIN_FAIL.equals(eventPO.getEvent_name());}}).times(3) // 3次,宽松近邻.within(Time.seconds(60)); // 最近一分钟(时间)}
}

检测最近1分钟内【连续】登录失败超过3次的用户

基于【个体模式】

CEP模式:3次登录失败事件必须是连续的【严格紧邻】

添加该方法即可.consecutive()

public class LoginFailByConsecutive {public static void main(String[] args) {// KafkaDataStream<EventPO> eventStream = KafkaUtil.read(args);// 生成KeyedStreamKeyedStream<EventPO, Integer> keyedStream = eventStream.keyBy(new KeySelector<EventPO, Integer>() {@Overridepublic Integer getKey(EventPO eventPO) throws Exception {return eventPO.getUser_id_int();}});Pattern.<EventPO>begin("login_fail_first").where(new SimpleCondition<EventPO>() {@Overridepublic boolean filter(EventPO eventPO) throws Exception {return EventConstant.LOGIN_FAIL.equals(eventPO.getEvent_name());}})/* *********************** 1. 个体模式的循环模式 匹配的是 宽松近邻 (能够允许插入其他事件)* 2. consecutive() 就指定匹配模式是 严格紧邻(连续)* *********************/.times(3).consecutive() // 连续.within(Time.seconds(60));}
}

宽松近邻与严格紧邻

宽松近邻:不连续事件

严格紧邻:连续事件

例子:

事件流1(连续登录失败的事件流):event_A(login_fail),event_B(login_fail),event_C(login_fail)【严格紧邻】

事件流2(不连续登录失败的事件流):event_A(login_fail),event_D(login_success),event_B(login_fail),event_C(login_fail)【宽松近邻】

基于【组合模式】

单体模式、组合模式通用

组合模式.next(...)严格紧邻(连续事件)

public class LoginFailByComposite {public static void main(String[] args) {DataStream<EventPO> eventStream = KafkaUtil.read(args);KeyedStream<EventPO, Integer> keyedStream = eventStream.keyBy(new KeySelector<EventPO, Integer>() {@Overridepublic Integer getKey(EventPO eventPO) throws Exception {return eventPO.getUser_id_int();}});// 三个连续登录失败事件【组合模式】Pattern.<EventPO>begin("login_fail_first").where(new SimpleCondition<EventPO>() {@Overridepublic boolean filter(EventPO eventPO) throws Exception {return EventConstant.LOGIN_FAIL.equals(eventPO.getEvent_name());}}).next("login_fail_second") // 严格紧邻.where(new SimpleCondition<EventPO>() {@Overridepublic boolean filter(EventPO eventPO) throws Exception {return EventConstant.LOGIN_FAIL.equals(eventPO.getEvent_name());}}).next("login_fail_third").where(new SimpleCondition<EventPO>() {@Overridepublic boolean filter(EventPO eventPO) throws Exception {return EventConstant.LOGIN_FAIL.equals(eventPO.getEvent_name());}}).within(Time.seconds(60));}
}

基于【迭代条件】检测最近15分钟内IP更换次数超过3次的用户

注意

  1. 对于每个模式 (规则/Pattern)

    可以设置条件判定到达的行为事件,是否能够进入到这个模式

    如:设置条件为只有登录成功这个行为事件,才能够进入到这个模式

  2. 条件的设置方法是:where()

    where() 的参数是 IterativeCondition对象

  3. IterativeCondition 称为迭代条件

    能够设置较复杂的条件,尤其和循环模式相结合

public class IpChangeByIterative {public static void main(String[] args) {DataStream<EventPO> eventStream = KafkaUtil.read(args);KeyedStream<EventPO, Integer> keyedStream = eventStream.keyBy((KeySelector<EventPO, Integer>) EventPO::getUser_id_int);Pattern<EventPO, ?> pattern =Pattern.// 组合模式以begin开头,// 不设置条件,所有行为事件都可以进入到这个模式<EventPO>begin("ip")// 判断用户行为事件在15分钟内IP是否发生变化(更换IP之间可以有其他事件).followedBy("next").where(new IpChangeCondition())// 15分钟内IP发生变化次数超过3次.timesOrMore(3)// 满足条件的行为事件必须在最近15分钟内.within(Time.seconds(900));// 将模式应用到事件流/* *********************** * CEP.pattern(),* 还可以有第3个参数,* 第3个参数是比较器 EventComparator 对象,* 可以对于同时进入模式的行为事件,进行更精确的排序** *********************/PatternStream<EventPO> patternStream = CEP.pattern(keyedStream, pattern);// 提取数据...}
}

判断条件

继承IterativeCondition

Context 是上下文对象,getEventsForPattern(...)根据传入的模式名获取对应模式中已匹配的所有行为事件

public class IpChangeCondition extends IterativeCondition<EventPO> {@Overridepublic boolean filter(EventPO eventPO, Context<EventPO> context) throws Exception {boolean change = false;// 当前模式名称是"ip", 获取当前模式之前已经匹配的事件for (EventPO preEvent : context.getEventsForPattern("ip")) {// 前一个行为事件的IPString preIP = preEvent.getEvent_context().getDevice().getIp();// 当前行为事件的IPString IP = eventPO.getEvent_context().getDevice().getIp();// 判断前后行为事件的IP是否发生变化if (!Objects.equals(preIP, IP)) {change = true;break;}}return change;}
}

用户在15分钟内的行为路径是"登录-领券-下单"(明显薅羊毛行为特征)

组合模式

public class ClipCouponsRoute {public static void main(String[] args) {DataStream<EventPO> eventStream = KafkaUtil.read(args);KeyedStream<EventPO, Integer> keyedStream = eventStream.keyBy((KeySelector<EventPO, Integer>) EventPO::getUser_id_int);// 生成模式 (规则/Pattern)【组合模式】Pattern<EventPO, ?> pattern =Pattern// 过滤登录行为事件.<EventPO>begin("login").where(new SimpleCondition<EventPO>() {@Overridepublic boolean filter(EventPO eventPO) throws Exception {return EventConstant.LOGIN_SUCCESS.equals(eventPO.getEvent_type());}})// 宽松近邻:过滤领取优惠券行为事件.followedBy("receive").where(new SimpleCondition<EventPO>() {@Overridepublic boolean filter(EventPO eventPO) throws Exception {return EventConstant.COUPON_RECEIVE.equals(eventPO.getEvent_type());}})// 宽松近邻:过滤使用优惠券行为事件.followedBy("use").where(new SimpleCondition<EventPO>() {@Overridepublic boolean filter(EventPO eventPO) throws Exception {return EventConstant.COUPON_USE.equals(eventPO.getEvent_type());}})// 模式有效时间:15分钟内.within(Time.minutes(15));}
}

【提取、输出事件流】

将模式应用到事件流生成PatternStream

生成Pattern之后,就要提取输出事件流

        // ...// 生成模式 (规则/Pattern)Pattern<EventPO, ?> pattern =Pattern. <EventPO>begin("ip").followedBy("next").where(new IpChangeCondition()).timesOrMore(3).within(Time.seconds(900));// 将模式应用到事件流/* *********************** CEP.pattern(),* 还可以有第3个参数,* 第3个参数是比较器 EventComparator 对象,* 可以对于同时进入模式的行为事件,进行更精确的排序* *********************/PatternStream<EventPO> patternStream = CEP.pattern(keyedStream, pattern);

PatternStream三个提取匹配事件方法

  1. select(): 参数是 PatternSelectFunction 对象,有返回值
  2. flatselect():参数是 PatternFlatSelectFunction 对象,无返回值,可以通过 Collector.collect() 以事件流输出
  3. process(): 参数是 PatternProcessFunction 对象,无返回值,可以通过 Collector.collect() 以事件流输出也可以通过 Context对象获取上下文信息

建议使用 flatSelect(), 可以更加灵活;官方建议使用 process()

以15分钟IP变化为例,完整代码:

public class IpChangeByIterative {public static void main(String[] args) {DataStream<EventPO> eventStream = KafkaUtil.read(args);KeyedStream<EventPO, Integer> keyedStream = eventStream.keyBy((KeySelector<EventPO, Integer>) EventPO::getUser_id_int);// 生成模式 (规则/Pattern)Pattern<EventPO, ?> pattern =Pattern.<EventPO>begin("ip")               .followedBy("next").where(new IpChangeCondition()).timesOrMore(3).within(Time.seconds(900));// 将模式应用到事件流PatternStream<EventPO> patternStream = CEP.pattern(keyedStream, pattern);// 提取匹配事件DataStream<EventPO> result = patternStream.process(new IpChangeProcessFunction());// 执行规则命中的策略动作}
}
public class IpChangeProcessFunction extends PatternProcessFunction<EventPO, EventPO> {/*** @param map       Map<模式名, 模式名对应匹配事件列表>* @param context   上下文对象* @param collector 输出事件流*/@Overridepublic void processMatch(Map<String, List<EventPO>> map, Context context, Collector<EventPO> collector) throws Exception {}
}

提取输出事件流,下游算子处理

Flink-Cep基石 NFA状态转移流程

薅羊毛用户是有着明显目的的

正常用户行为事件流

image-20240112100307371

来回比较不同商品价格,最终决定购买哪件商品。

薅羊毛用户行为事件流

image-20240112100502848

带有很强的目的性,“登录-领券-下单”事件流一气呵成。

Cep底层原理

  1. CEP模式匹配:每个模式包含多个状态
  2. CEP模式匹配:状态转换的过程(NFA)

以羊毛党购买商品为例,状态变化流程:

image-20240112103126913

image-20240112103149404

image-20240112103201188

匹配上事件,设置状态,三个状态都不一样,符合条件的事件,放到结果集中,与预先设置条件数量一致。

image-20240112103249306

状态最后转换为最终状态,后续传递给下游算子计算。

CEP工作流程

  1. 定义一个一个的Pattern,如有多个Pattern,将Pattern串联起来构成模式匹配的逻辑表达
  2. 将模式匹配分拆,创建NFA对象
  3. NFA对象包含了这个模式匹配的状态和状态转换表达式
  4. 状态变化、处理

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/341990.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

书客、柏曼、松下护眼台灯哪款更靠谱?实测核心数据对比PK!

随着科技时代的到来&#xff0c;人们的生活水平在不断提高&#xff0c;不少家长开始担心自家孩子的近视问题&#xff0c;护眼台灯在家庭中的讨论热度也越来越高&#xff0c;光线舒适又具备多种功能&#xff0c;不少家长都给孩子入手了护眼台灯。不过作为家电博主&#xff0c;我…

一天一个设计模式---适配器模式

概念 适配器模式是一种结构型设计模式&#xff0c;用于将一个类的接口转换成客户端所期望的另一个接口。它允许不兼容的接口之间进行协同工作&#xff0c;使得原本由于接口不匹配而无法合作的类能够一起工作。 具体内容 适配器模式主要包括以下几个要素&#xff1a; 目标接…

Open CV 图像处理基础:(一)Open CV 在windows环境初始化和 Java 动态库加载方式介绍

Open CV 在windows环境初始化和 Java 动态库加载方式介绍 目录 Open CV 在windows环境初始化和 Java 动态库加载方式介绍OpenCV安装opencv-4.4.0下载安装 加载opencv-4.4.0.jar包jar包引入mavn-init.cmdjar包装载到本地maven仓库pom.xml加载动态库 加载动态库opencv_java440.dl…

2023一带一路暨金砖国家技能发展与技术创新大赛“网络安全”赛项省选拔赛样题卷①

2023金砖国家职业技能竞赛"网络安全" 赛项省赛选拔赛样题 2023金砖国家职业技能竞赛 省赛选拔赛样题第一阶段&#xff1a;职业素养与理论技能项目1. 职业素养项目2. 网络安全项目3. 安全运营 第二阶段&#xff1a;安全运营项目1. 操作系统安全配置与加固任务一Linux …

深度学习”和“多层神经网络”的区别

在讨论深度学习与多层神经网络之间的差异时&#xff0c;我们必须首先理解它们各自是什么以及它们在计算机科学和人工智能领域的角色。 深度学习是一种机器学习的子集&#xff0c;它使用了人工神经网络的架构。深度学习的核心思想是模拟人脑神经元的工作方式&#xff0c;以建立…

FPGA——时序分析与约束(Quartus II)

FPGA时序分析与约束 FPGA结构基础数据传输模型Quartus II 时序报告Quartus II 中TimeQuest的操作实操 时序分析&#xff1a;通过分析FPGA内部各个存储器之间的数据和时钟传输路径&#xff0c;来分析数据延迟和时钟延迟的关系&#xff0c;保证所有寄存器都可以正确寄存数据。 数…

The Planets:Earth

靶机下载 The Planets: Earth ~ VulnHub 信息收集 # nmap -sn 192.168.1.0/24 -oN live.nmap Starting Nmap 7.94 ( https://nmap.org ) at 2024-01-11 09:20 CST Nmap scan report for 192.168.1.1 Host is up (0.00036s latency). MAC Address: …

使用numpy处理图片——图片拼接

大纲 左右拼接上下拼接 在《使用numpy处理图片——图片切割》一文中&#xff0c;我们介绍了如何使用numpy将一张图片切割成4部分。本文我们将反其道而行之&#xff0c;将4张图片拼接成1张图片。 基本的思路就是先用两张图以左右结构拼接成上部&#xff0c;另外两张图也以左右拼…

org.springframework.web.servlet.HandlerInterceptor

过期 1 配置黑名单 2 启动注册拦截 3 浏览器访问拦截

C#考勤系统数据分析源码

C#考勤系统数据分析源码 源码描述&#xff1a; 针对大部分考勤机采用E语言和ACCESS数据库做系统的缺陷。 做出如下建议&#xff1a; 1.打卡机设置成直接续传数据到SQL Server&#xff0c;不需要开着考勤系统和考勤安装的电脑去维持打卡记录 2.打卡机数据共享&#xff0c;把内部…

软件包安装

1.软件包分类 1.1软件包的分类 源码包二进制包脚本安装包 1.2源码包 1.2.1源码包的样子 源码包可以认为是利用不同的计算机语言而写的包, 我们打开相应的文件也能看到相应的源码 1.2.2源码包的特点 源码包的优点: 开源, 如果有足够的能力, 可以修改源代码可以自由选择…

HackTheBox-Keeper

OpenVPN连接 连接上HackTheBox&#xff01; 同时找到这个靶机&#xff0c;进行join&#xff01;分配的靶机的地址位10.10.11.227&#xff01; 信息收集 nmap -sT --min-rate 10000 -p- 10.10.11.227 开放端口为22和80端口 服务版本和操作系统信息探测&#xff1a; nmap -s…