RisingWave最佳实践-利用Dynamic filters 和 Temporal filters 实现监控告警

心得的体会

刚过了年刚开工,闲暇之余调研了分布式SQL流处理数据库–RisingWave,本人是Flink(包括FlinkSQL和Flink DataStream API)的资深用户,但接触到RisingWave令我眼前一亮,并且拿我们生产上的监控告警场景在RisingWave上做了验证,以下是自己的心得体会:

RisingWave架构简单,运维成本底,基于云原生(可以分别基于计算和存储动态伸缩),同时在开发上屏蔽了Flink等实时处理框架底层需要处理的一些技术细节(状态存储,数据一致性,分布式集群扩展等)。提供了与PostgreSQL兼容的标准SQL接口,用户可以像使用 PostgreSQL 一样处理数据流。并且RisingWave不单单可以处理流式数据,还提供了数据其他流式处理框架(如:Flink、storm)所不具备的数据存储能力,基本可以完全取代FlinkSQL。相对于其他OLAP系统(如:apache doris,starrocks),RisingWave采用同步实时,可以保证实时的新鲜度;强一致性,而不是最终一致性。用户需要做的仅仅是通过开发SQL就可以处理流数据,当然首先需要具备流式数据处理思维(相对于离线)。

RisingWave当然也有自身的不足,相对于Flink可以通过DataStream API自定义灵活的处理流式数据,RisingWave只能解决一些特定的流式场景,无法做太多定制开发;相对于Apache Doris等OLAP实时分析性数据库,RisingWave不适合做分析型随机查询。另外RisingWave是个新事物,正在发展阶段,周边生态和相关文档还不健全,作为尝鲜者可能会踩很多坑。然而令人欣慰的是RisingWave的社区回复还是很及时的,RisingWave官方投入了很多精力在做RisingWave的布道和答疑。

至于争论比较厉害的RisingWav VS FLink的性能和吞吐量上孰优孰劣,针对不同应用场景可能有不同表现,因此没有亲自调研就没有发言权。但我认为在不同的场景下他们应该有各自的优势。无论如何RisingWave部署简单,上手容易,试错成本低是一个不争的事实。RisingWave可以应用在一些数据看版,监控,实时指标等场景。

利用动态和时间过滤器实现监控告警

FlinkSQL解决不了定时触发的问题,FlinkSQL的流处理逻辑只是按event触发,不能按时间条件触发,也就是没有触发器机制。FlinkSQL窗口的定时触发,归根结底也是基于event触发,event驱动的机制。因此需要触发器的场景就需要用到Flink DataStream API的KeyedProcessFunction等算子。但RisingWave利用Dynamic filters 和 Temporal filters 可以间接实现类似场景的触发器机制。

场景描述

现有如下群消息实时指标监控场景:
数据有初始化(init)、查询(query)、回调(callback:succ+fail)三种先后顺序状态。
数据是按预设时间批次分组的,例如:2024-01-01 08:00:00、2024-01-01 08:30:00,实时统计每一个批次内三种不同状态的数据count。

监控指标一:在某一个批次延迟指定的时间(query_timeout)之内(例如:2024-01-01 08:00:00延迟1小时触发时间为系统时间2024-01-01 09:00:00),该批次的query状态数据count没有达到init状态的数量count阀值(即query_count<init_count*query_threshold)就触发告警。
同时结束该批次数据统计,下发该批次数据的指标包括:批次时间、init_count、query_count等

监控指标二:如果指标一告警没有被触发,该批次在满足query状态数据count达到init状态数量count的阀值(即query_count>=init_countquery_threshold)以后,在指定的延迟时间内(callback_timeout),该批次的callback状态数据count没有达到query状态的数量count阀值(即callback_count<query_countcallback_threshold)就触发告警。
同时结束该批次数据统计,下发该批次数据的指标包括:批次时间、init_count、query_count、callback_count等

群消息实时指标监控流程图如下:
群消息实时指标监控流程图

实例demo

RisingWave部署可以参考:RisingWave分布式SQL流处理数据库调研

假设:
query_threshold=1, callback_count=1
query_timeout= ‘5 minute’, callback_timeout= ‘1 minute’
0->init,1->query,2->callback

RisingWave SQL:

 
DROP TABLE t_msg;
CREATE TABLE t_msg(msg_id int,status smallint ,public_time timestamp,process_time timestamp as proctime()
) APPEND ONLY;set timezone = 'PRC';--PRC(People’s Republic of China)
show timezone;select * from t_msg;--统计不同状态的count
DROP MATERIALIZED VIEW mv_t_msg_groupby; 
CREATE MATERIALIZED VIEW mv_t_msg_groupby AS
SELECT public_time
,sum(case when status = 0  then 1 else 0 end) AS init_count
,sum(case when status = 1  then 1 else 0 end) AS query_count
,sum(case when status = 2  then 1 else 0 end) AS callback_count
,max(process_time) as process_time
FROM t_msg 
group by public_time;select * from mv_t_msg_groupby;--sink_query_alarm 
DROP SINK sink_query_alarm;
CREATE SINK sink_query_alarm AS 
SELECT public_time
,init_count
,query_count
,process_time
FROM mv_t_msg_groupby
where public_time + INTERVAL '5 minute' <= now()   --query_timeout=1 minute, public_time相当于当前时间延迟1分钟触发,【Delay table changes】
and   init_count*1 > query_count --query_threshold=1
WITH (connector='kafka',properties.bootstrap.server='192.168.1.100:8092',topic='t_sink_query_alarm'
)
FORMAT PLAIN ENCODE JSON(force_append_only='true'
);--由于RisingWave不支持在【MATERIALIZED VIEW】和【SINK】等【可伸缩流】中指定处理时间字段,因此需要借助外部存储kafka周转
--RisngWave官方给的解释:support a proctime on an append only stream might be easier but on retractable stream could take extra cost. We must think it carefully to introduce such a feature.--sink_query_succ 
DROP SINK sink_query_succ;
CREATE SINK sink_query_succ AS 
SELECT public_time
,init_count
,query_count
,callback_count
,process_time as query_succ_process_time
FROM mv_t_msg_groupby
where public_time + INTERVAL '5 minute' >= now()   --query_timeout=1 minute, 在指定的时间内,【Delete and clean expired data】
and   init_count*1 <= query_count --query_threshold=1,query_count达到了指定值
WITH (connector='kafka',properties.bootstrap.server='192.168.1.100:8092',topic='t_sink_query_succ'
)
FORMAT PLAIN ENCODE JSON(force_append_only='true'
);--source连接器
DROP SOURCE source_query_succ;
CREATE SOURCE IF NOT EXISTS source_query_succ (init_count int,query_count int ,callback_count int ,public_time timestamp,query_succ_process_time timestamp
)
WITH (connector='kafka',topic='t_sink_query_succ',properties.bootstrap.server='192.168.1.100:8092',scan.startup.mode='earliest', -- earliest ,latest,default:earliest 
) FORMAT PLAIN ENCODE JSON;select * from source_query_succ;--sink_callback_alarm,用到动态过滤器和时间过滤器
DROP SINK sink_callback_alarm;
CREATE SINK sink_callback_alarm AS 
WITH tmp AS ( 
select public_time, min(query_succ_process_time) as query_succ_process_time  -- 动态过滤器
FROM source_query_succ 
group by public_time
)
SELECT b.public_time
,b.init_count
,b.query_count
,b.callback_count
,b.process_time
,a.query_succ_process_time
FROM  tmp a
JOIN  mv_t_msg_groupby b ON a.public_time=b.public_time
where a.query_succ_process_time + INTERVAL '1 minute' <= now()   --query_timeout=1 minute, public_time相当于当前时间延迟1分钟触发,【Delay table changes】
and   b.query_count*1 > b.callback_count --callback_threshold=1
WITH (connector='kafka',properties.bootstrap.server='192.168.1.100:8092',topic='t_sink_callback_alarm'
)
FORMAT PLAIN ENCODE JSON(force_append_only='true'
);-- 模拟数据
--init
INSERT INTO t_msg values(1,0,'2024-02-23 15:55:00'::TIMESTAMP); --比当前系统时间早
INSERT INTO t_msg values(2,0,'2024-02-23 15:55:00'::TIMESTAMP);
INSERT INTO t_msg values(3,0,'2024-02-23 15:55:00'::TIMESTAMP);
--query                                
INSERT INTO t_msg values(1,1,'2024-02-23 15:55:00'::TIMESTAMP);
INSERT INTO t_msg values(2,1,'2024-02-23 15:55:00'::TIMESTAMP);
INSERT INTO t_msg values(3,1,'2024-02-23 15:55:00'::TIMESTAMP);
--callback                             
INSERT INTO t_msg values(1,2,'2024-02-23 15:55:00'::TIMESTAMP);
INSERT INTO t_msg values(2,2,'2024-02-23 15:55:00'::TIMESTAMP);
INSERT INTO t_msg values(3,2,'2024-02-23 15:55:00'::TIMESTAMP);

查看监控结果:

docker run -it --rm edenhill/kcat:1.7.1 kcat -b 192.168.1.100:8092 -t t_sink_query_alarm -C 
docker run -it --rm edenhill/kcat:1.7.1 kcat -b 192.168.1.100:8092 -t t_sink_callback_alarm -C 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/487973.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

快速排序法的名字由来,排序步骤是什么,最坏情况下的排序次数如何计算得来的呢?

问题描述&#xff1a; 快速排序法的名字由来&#xff0c;排序步骤是什么&#xff0c;最坏情况下的排序次数如何计算得来的呢&#xff1f; 问题解答&#xff1a; 快速排序法的名字来源于其排序速度快的特点。它是由英国计算机科学家 Tony Hoare 于1960年提出的&#xff0c;最…

劫持已经存在的DLL

这里找到一个成功加载的 这里先把原来程序正常的dll改名为libEGL1.dll&#xff0c;然后将我们自己的dll改名为libEGL.dll 然后再重新执行程序&#xff0c;这里同样是弹出了窗口

C++的string容器->基本概念、构造函数、赋值操作、字符串拼接、查找和替换、字符串比较、字符存取、插入和删除、子串

#include<iostream> using namespace std; #include <string> //string的构造函数 /* -string(); //创建一个空的字符串 例如: string str; -string(const char* s); //使用字符串s初始化 -string(const string& str); //使…

每日一学—由面试题“Redis 是否为单线程”引发的思考

文章目录 &#x1f4cb; 前言&#x1f330; 举个例子&#x1f3af; 什么是 Redis&#xff08;知识点补充&#xff09;&#x1f3af; Redis 中的多线程&#x1f3af; I/O 多线程&#x1f3af; Redis 中的多进程&#x1f4dd; 结论&#x1f3af;书籍推荐&#x1f525;参与方式 &a…

备战蓝桥杯—— 双指针技巧巧答链表1

对于单链表相关的问题&#xff0c;双指针技巧是一种非常广泛且有效的解决方法。以下是一些常见问题以及使用双指针技巧解决&#xff1a; 合并两个有序链表&#xff1a; 使用两个指针分别指向两个链表的头部&#xff0c;逐一比较节点的值&#xff0c;将较小的节点链接到结果链表…

C++的文件操作详解

目录 1.文本文件 1.写文件 2.读文件 2.二进制文件 1.写文件 2.读文件 1.文本文件 1.写文件 #include<bits/stdc.h> #include<fstream> using namespace std;int main() {ofstream ofs;ofs.open("text.txt",ios::out);ofs << "abc&qu…

H桥逆变控制方式(单极性倍频)

单极性倍频图像 内部做了载波取反&#xff1a;正相载波和负相载波 最后都和调制载波一起比较 正相载波&#xff1a;Q7导通为高电平&#xff0c;Q15导通为低电平 负相载波&#xff1a;Q16导通为高电平&#xff0c;Q8导通为低电平 导通次序为&#xff1a;Q7Q16——Q7Q8——Q7Q…

第3.3章:StarRocks数据导入——Stream Load

一、概述 Stream Load是StarRocks最为核心的导入方式&#xff0c;用户通过发送HTTP请求将本地文件或数据流导入至StarRocks中&#xff0c;其本身不依赖其他组件。 Stream Load支持csv和json两种数据文件格式&#xff0c;适用于数据文件数量较少且单个文件的大小不超过10GB 的场…

【微服务】国内微服务生态标准-SpringCloud Alibaba

现在已经是21世纪的二十年代&#xff0c;在未来的很长时间&#xff0c;以互联网、IOT物联网为代表的分布式应用必将越来越多&#xff0c;大量的软件企业对掌握微服务与高可用、高性能、高并发的架构人才也必定趋之若鹜。我们可以看看现阶段针对软件架构师的招聘需求和薪资&…

LoRA Land:性能优于 GPT-4 的微调开源 LLMs

我们很高兴发布 LoRA Land&#xff0c;这是 25 个经过微调的 Mistral-7b 模型的集合&#xff0c;根据任务的不同&#xff0c;它们的性能始终比基本模型高出 70%&#xff0c;GPT-4 高出 4-15%。LoRA Land 的 25 个任务专用大型语言模型 &#xff08;LLMs&#xff09; 都使用 Pre…

Java的编程之旅27——继承

1.继承的简介 继承是面向对象编程中的一个重要概念&#xff0c;指的是一个类可以继承另一个类的属性和方法。被继承的类称为父类或基类&#xff0c;继承这个父类的类称为子类或派生类。 通过继承&#xff0c;子类可以继承父类的属性和方法&#xff0c;使得子类具有相似的行为…

【线程池项目(二)】线程池FIXED模式的实现

在上一篇【线程池项目&#xff08;一&#xff09;】项目介绍和代码展示 中&#xff0c;我们展示了线程池的两个版本实现&#xff0c;它们的代码在具体的实现细节上是优化过了的。下文提供的代码并非完整&#xff0c;也有很多地方尚需改善&#xff0c;但这些差异对理解整个项目而…