Stream4Graph:动态图上的增量计算

作者:张奇

众所周知,当我们需要对数据做关联性分析的时候,一般会采用表连接(SQL join)的方式完成。但是SQL join时的笛卡尔积计算需要维护大量的中间结果,从而对整体的数据分析性能带来巨大影响。相比而言,基于图的方式维护数据的关联性,原本的关联性分析可以转换为图上的遍历操作,从而大幅降低数据分析的成本。

然而,随着数据规模的不断增长,以及对数据处理更强的实时性需求,如何高效地解决大规模图数据上的实时计算问题,就变得越来越紧迫。传统的计算引擎,如Spark、Flink对于图数据的处理已经逐渐不能满足业务日益增长的诉求,因此设计一套面向大规模图数据的实时处理引擎,将会对大数据处理技术革新带来巨大的帮助。

蚂蚁图计算团队开源的流图计算引擎GeaFlow,结合了图处理和流处理的技术优势,实现了动态图上的增量计算能力,在高性能关联性分析的基础上,进一步提升了图计算的实时性。接下来向大家介绍图计算技术的特点,业内如何解决大规模实时图计算问题,以及GeaFlow在动态图上的计算性能表现。

1. 图计算

图是一种数学结构,由节点和边组成。节点代表各种实体,比如人、地点、事物或概念,而边则表示这些节点之间的关系。例如:

  • 社交媒体:节点可以代表用户,边可以表示朋友关系。
  • 网页:节点代表网页,边代表超链接。
  • 交通网络:节点代表城市,边代表道路或航线。

图本身代表了节点与节点之间的链接关系,而针对这些关系,我们可以利用图中的节点和边来进行信息处理、分析和挖掘,帮助我们理解复杂系统中的关系和模式。在图上开展的计算活动就是图计算。图计算有很多应用场景,比如通过社交网络分析可以识别用户之间的联系,发现社群结构;通过分析网页间的链接关系来计算网页排名;通过用户的行为和偏好构建关系图,推荐相关内容和产品。

我们就以简单的社交网络分析算法,弱联通分量(Weakly Connected Components, WCC)为例。弱联通分量可以帮助我们识别用户之间的“朋友圈”或“社区”,比如某个社交平台上,一群用户通过点赞、评论或关注形成一个大的弱联通分量,而某些用户可能没有连接到这个大分量,形成更小的弱联通分量。

如果仅仅针对上面这张小图来构建弱联通分量算法,那么非常简单,我们只需要在个人PC上构建简单的点边结构然后走图遍历即可。但如果图的规模扩展的千亿甚至万亿,这时就需要用到大规模分布式图计算引擎来处理了。

2. 分布式图计算:Spark GraphX

针对图的处理一般有图计算引擎和图数据库两大类,图数据库有Neo4j‌、TigerGraph‌等,图计算引擎有Spark GraphX、Pregel等。在本文我们主要讨论图计算引擎,以Spark GraphX为例,Spark GraphX是Apache Spark的一个组件,专门用于图计算和图分析。GraphX结合了Spark的强大数据处理能力与图计算的灵活性,扩展了 Spark 的核心功能,为用户提供了一个统一的API,便于处理图数据。

那么在Spark GraphX上是如何处理图算法的呢?GraphX通过引入一种点和边都附带属性的有向多图扩展了Spark RDD这种抽象数据结构,为用户提供了一个类似于Pregel计算模型的以点为中心的并行抽象。用户需要为GraphX提供原始图graph、初始消息initialMsg、核心计算逻辑vprog、发送消息控制组件sendMsg、合并消息组件mergeMsg,计算开始时,GraphX初始阶段会激活所有点进行初始化,然后按照用户提供的发送消息组件确定接下来向那些点发送消息。在之后的迭代里,只有收到消息的点才会被激活,进行接下来的计算,如此循环往复直到链路中没有被新激活的点或者到达最大迭代次数,最后输出计算结果。

  def apply[VD: ClassTag, ED: ClassTag, A: ClassTag](graph: Graph[VD, ED],initialMsg: A,maxIterations: Int = Int.MaxValue,activeDirection: EdgeDirection = EdgeDirection.Either)(vprog: (VertexId, VD, A) => VD,sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],mergeMsg: (A, A) => A): Graph[VD, ED]
{var g = graph.mapVertices((vid, vdata) => vprog(vid, vdata, initialMsg))// compute the messagesvar messages = GraphXUtils.mapReduceTriplets(g, sendMsg, mergeMsg)// Loopvar prevG: Graph[VD, ED] = nullvar i = 0while (isActiveMessagesNonEmpty && i < maxIterations) {// Receive the messages and update the vertices.prevG = gg = g.joinVertices(messages)(vprog)graphCheckpointer.update(g)// Send new messages, skipping edges where neither side received// a message. messages = GraphXUtils.mapReduceTriplets(g, sendMsg, mergeMsg, Some((oldMessages, activeDirection)))}
}

总的来说,用户首先需要将存储介质中原始的表结构数据转换为GraphX中的点边数据类型,然后交给Spark进行处理,这是针对静态图进行离线处理。但是我们知道,现实世界中,图数据的规模和数据内节点之间的关系都不是一成不变的,并且在大数据时代其变化非常快。如何实时高效的处理不断变化的图数据(动态图),是一个值得深思的问题。

3. 动态图计算:Spark Streaming

针对动态图的处理,常见的解决方案是Spark Streaming框架,它可以从很多数据源消费数据并对数据进行处理。它是是Spark核心API的一个扩展,可以实现高吞吐量的、具备容错机制的实时流数据的处理。

如上图所示是Spark Streaming对实时数据进行处理的流程。首先Spark中的每个Receiver接收到实时消息流后,对实时消息进行解析和切分,之后将生成的图数据存储在每个Executor中。每当数据累积到一定的批次,就会触发一次全量计算,最后将计算出的结果输出给用户,这也称之为基于快照的图计算方案。

但这种方案有一个比较大的缺点,就是它存在着重复计算的问题,假如我们需要以1小时一个窗口做一次计算,那么在使用Spark进行计算时,不仅要将当前窗口的数据计算进去,历史所有数据也需要进行回溯,存在大量重复计算,这样做效率不高,因此我们需要一套能够进行增量计算的图计算方案。

4. 动态图增量计算:GeaFlow

我们知道在传统的流计算引擎中,如Flink,其处理模型允许系统能够处理不断流入的数据事件。处理每个事件时,Flink 可以评估变化并仅针对变化的部分执行计算。这意味着在增量计算过程中,Flink 会关注最新到达的数据,而不是整个数据集。于是受到Flink增量计算的启发,我们自研了增量图计算系统GeaFlow(也叫流图计算引擎),能够很好的支持增量图迭代计算。

那么GeaFlow是如何实现增量图计算的呢?首先,实时数据通过connector消息源输入的GeaFlow中,GeaFlow依据实时数据,生成内部的点边结构数据,并且将点边数据插入进底图中。当前窗口的实时数据涉及到的点会被激活,触发图迭代计算。

这里以WCC算法为例,对联通分量算法而言,在一个时间窗口内每条边对应的src id和tar id对应的顶点会被激活,第一次迭代需要将其id信息通知其邻居节点。如果邻居节点收到消息后,发现需要更新自己的信息,那么它需要继续将更新消息通知给它的邻居节点;如果说邻居节点不需要更新自己的信息,那么它就不需要通知其邻居节点,它对应的迭代终止。

5. GeaFlow架构简析

GeaFlow引擎主要由三大主要部分组成,DSL、Framework和State,同时向上为用户提供了Stream API、静态图API和动态图API。DSL主要负责图查询语言SQL+ISO/GQL的解析和执行计划的优化,同时负责schema的推导,也向外部承接了多种Connector,比如hive、hudi、kafka、odps等。Framework层负责运行时的调度和容灾,shuffle以及框架内各个组件的管理协调。State层负责存储底层图数据和数据的持久化,同时也负责索引、下推等众多性能优化工作。

6. GeaFlow性能测试

为了验证GeaFlow的增量图计算性能,我们设计了这样的实验。一批数据按照固定时间窗口实时输入到计算引擎中,我们分别用Spark和GeaFlow对全图做联通分量算法计算,比较两者计算耗时。实验在3台24核内存128G的机器上开展,使用的数据集是公开数据集soc-Livejournal,测试的图算法是弱联通分量算法。我们以50w条数据作为一个计算窗口,每输入到引擎中50w条数据,就触发一次图计算。

Spark作为批处理引擎,对于每一批窗口来的数据,不管窗口规模是大是小,都需要对增量图数据连同历史图数据进行全量计算。在Spark上,可以直接调用Spark GraphX内部内置的WCC算法进行计算。

object SparkTest {def main(args: Array[String]): Unit = {val iter_num: Int = args(0).toIntval parallel: Int = args(1).toIntval spark = SparkSession.builder.appName("HDFS Data Load").config("spark.default.parallelism", args(1)).getOrCreateval sc = new JavaSparkContext(spark.sparkContext)val graph = GraphLoader.edgeListFile(sc, "hdfs://rayagsecurity-42-033147014062:9000/" + args(2), numEdgePartitions = parallel)val result = graph.connectedComponents(10)handleResult(result)print("finish")}def handleResult[VD, ED](graph: Graph[VD, ED]): Unit = {graph.vertices.foreachPartition(_.foreach(tuple => {}))}
}

GeaFlow上支持SQL+ISO/GQL的图查询语言,我们使用图查询语言调用GeaFlow内置的增量联通分量图算法进行测试,图查询语言代码如下:

CREATE TABLE IF NOT EXISTS tables (f1 bigint,f2 bigint
) WITH (type='file',geaflow.dsl.window.size='16000',geaflow.dsl.column.separator='\t',test.source.parallel = '32',geaflow.dsl.file.path = 'hdfs://xxxx:9000/com-friendster.ungraph.txt'
);CREATE GRAPH modern (Vertex v1 (id int ID),Edge e1 (srcId int SOURCE ID,targetId int DESTINATION ID)
) WITH (storeType='memory',shardCount = 256
);INSERT INTO modern(v1.id, e1.srcId, e1.targetId)
(SELECT f1, f1, f2FROM tables
);INSERT INTO modern(v1.id)
(SELECT f2FROM tables
);CREATE TABLE IF NOT EXISTS tbl_result (vid bigint,component bigint
) WITH (ignore='true',type ='file'
);use GRAPH modern;INSERT INTO tbl_result
CALL inc_wcc(10) YIELD (vid, component)
RETURN vid, component
;

下图是对两者进行联通分量算法实验时得到的实验结果。以50w条数据为一个窗口进行迭代计算,Spark中存在大量的重复计算,因为其还要回溯全量的历史数据进行计算。而GeaFlow只会激活当前窗口中涉及到的点边进行增量计算,计算可在秒级别完成,每个窗口的计算时间基本稳定。随着数据量的不断增大,Spark进行计算时所需要回溯的历史数据就越多,在其机器容量没有达到上限的情况下,其计算时延和数据量呈正相关分布。相同情况下GeaFlow的计算时间也会略微增大,但基本可以在秒级别完成。

7. 总结

传统的图计算方案(如Spark GraphX)在近实时场景中存在重复计算问题,受Flink流处理模型和传统图计算的启发,我们给出了一套能够支持增量图计算的方案。总的来说GeaFlow主要有以下几个方面的优势:

  1. GeaFlow在处理增量实时计算时,性能优于Spark Streaming + GraphX方案,尤其是在大规模数据集上。
  2. GeaFlow通过增量计算避免了全量数据的重复处理,计算效率更高,计算时间更短性能不明显下降。
  3. GeaFlow支持SQL+GQL混合处理语言,更适合开发复杂的图数据处理任务。

GeaFlow项目代码已全部开源,我们完成了部分流图引擎基础能力的构建,未来希望基于GeaFlow构建面向图数据的统一湖仓处理引擎,以解决多样化的大数据关联性分析诉求。同时我们也在积极筹备加入Apache基金会,丰富大数据开源生态,因此非常欢迎对图技术有浓厚兴趣同学加入社区共建。

社区中有诸多有趣的工作尚待完成,你可以从如下简单的「Good First Issue」开始,期待你加入同行。

  • 支持Paimon Connector插件,连接数据湖生态。(Issue 361)
  • 优化GQL match语句性能。(Issue 363)
  • 新增ISO/GQL语法,支持same谓词。(Issue 368)
  • ...

参考链接

  1. GeaFlow项目地址:https://github.com/TuGraph-family/tugraph-analytics
  2. soc-Livejournal数据集地址:https://snap.stanford.edu/data/soc-LiveJournal1.html
  3. GeaFlow Issues:https://github.com/TuGraph-family/tugraph-analytics/issues

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/892971.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Day01 新手入土markdown练习

Markdownd 学习 二级标题 三级标题 四级标题 五级标题 六级标题 ####### 七级标题 井号键+空格+标题内容 字体设置 hello,word 两边双星号为粗体 ctrl+b hello,word 两边单个星号 ctrl+i hello,word 三个星号又粗又斜 hello,word 双波浪号删除线hello,word ctrl…

workerman GatewayWorker laralvel 即时通讯

1.官网地址https://www.workerman.net/doc/gateway-worker 2.把下载的GatewayWorker 放入laravel 框架中 3.windows 中启动GetWayWork 双击启动 只需在events.php中做调整就好(每次修改都需要重新启动才会生效) 使用 websocket 协议 Events.phpclass Events {/*** 当客户端…

特性分支开发

背景 现在git创建,合并,发布,发布后处理,每个业务条线不一致,并且没有传承记录,为保证git统一使用,并保持一致,故有如下规则。 Git常用分支包括 master:项目主分支,有且仅有一个,除项目负责人外其他开发人员不得向 master 分支合并内容。hotfix:紧急线上 bug 修复分…

第八届西湖论剑 DS-easyrawencode

easyrawencode 解压后是一个raw,vol启动虚拟机性能太差了,我直接本机win跑vol.\volatility_2.6.exe -f "C:\Users\Eth\Desktop\match\西湖\easyrawencode.raw" imageinfo.\volatility_2.6.exe -f "C:\Users\Eth\Desktop\match\西湖\easyrawencode.raw" -…

victoriametrics 基础

victoriametrics 介绍 VictoriaMetrics 是一个高性能、高扩展性的开源时间序列数据库和监控解决方案,专为处理大规模指标数据设计。 victoriametrics 特点 兼容 Prometheus API,支持 PromQL 高压缩率(比 Prometheus 高 5-10 倍) 支持水平扩展(集群模式) 低资源消耗,单节…

2025年智慧园区信创化:门禁安防+项目管理的国产工具链

随着科技的飞速发展,智慧园区的建设已成为当今社会的重要趋势。2025 年,智慧园区的信创化将成为推动园区发展的关键力量,其中门禁安防和项目管理的国产工具链将发挥重要作用。本文将深入探讨这一主题,分析其现状、优势以及未来发展前景。 门禁安防:守护园区安全的第一道防…

企业CRM采购必须评估的7个关键指标,教你避开选型雷区

选CRM这事儿,听起来挺简单,但真动手了才发现坑多得数不过来: ——选错了,钱花了,CRM系统用不起来,员工不会用,管理反而更乱了。 那CRM系统到底该怎么选?今天咱们就从大家最关心的几个点聊聊,怎么挑到合适的CRM系统,避开那些大坑,把钱花在该花的地方。83% 受访者选型…

PyQt5 QComboBox 选择框文本居中

查找了许多教程,很多都是说的“下拉选项”居中,“已选中选项”的居中 有看到重构的方法,但是可能是因为版本不对或者其他原因没能生效,多次尝试后整理方法如下(依旧使用重构的方法)class CenteredComboBox(QComboBox):def paintEvent(self, event):painter = QStylePaint…

彻底卸载eclipse

参考———— https://zhidao.baidu.com/question/2215465437491888308.html

域环境搭建

准备 去https://next.itellyou.cn/(这个是新站需要注册,相比老站提供了BT链接,下载更快。)下载所需系统,像我这使用的是:Windows server 2019Windows 7Windows 10‍ ‍ 配置 DC域控 配置IP 打开控制面板,按步骤来:网络和Internet​ -> 更改适配器设置​ -> 属性​…

京准电钟:NTP校时服务器于安防监控系统应用方案

京准电钟:NTP校时服务器于安防监控系统应用方案京准电钟:NTP校时服务器于安防监控系统应用方案 京准电钟:NTP校时服务器于安防监控系统应用方案 京准电子科技官微——ahjzszNTP校时服务器在安防监控系统中的应用方案主要通过高精度时间同步技术,解决设备间时间差异问题,确…

2025 年最值得尝试的几款 DevOps 平台工具推荐

随着软件开发和运维的深度融合,DevOps 平台已成为现代企业加速数字化转型的核心引擎。在 2025 年,面对快速迭代的市场需求与复杂的技术架构,选择一款适配性强、功能完备的 DevOps 平台,不仅是优化研发流程的关键,更是企业实现高效协作与持续交付的基石。本文将为您精选几款…