【大数据】美团 DB 数据同步到数据仓库的架构与实践

美团 DB 数据同步到数据仓库的架构与实践

  • 1.背景
  • 2.整体架构
  • 3.Binlog 实时采集
  • 4.离线还原 MySQL 数据
  • 5.Kafka2Hive
  • 6.对 Camus 的二次开发
  • 7.Checkdone 的检测逻辑
  • 8.Merge
  • 9.Merge 流程举例
  • 10.实践一:分库分表的支持
  • 11.实践二:删除事件的支持
  • 12.总结与展望

1.背景

在数据仓库建模中,未经任何加工处理的原始业务层数据,我们称之为 ODSOperational Data Store)数据。在互联网企业中,常见的 ODS 数据有 业务日志数据Log)和 业务 DB 数据DB)两类。对于业务 DB 数据来说,从 MySQL 等关系型数据库的业务数据进行采集,然后导入到 Hive 中,是进行数据仓库生产的重要环节。

如何准确、高效地把 MySQL 数据同步到 Hive 中?一般常用的解决方案是批量取数并 Load:直连 MySQL 去 Select 表中的数据,然后存到本地文件作为中间存储,最后把文件 Load 到 Hive 表中。这种方案的优点是实现简单,但是随着业务的发展,缺点也逐渐暴露出来:

  • 性能瓶颈:随着业务规模的增长,Select From MySQLSave to LocalfileLoad to Hive 这种数据流花费的时间越来越长,无法满足下游数仓生产的时间要求。
  • 直接从 MySQL 中 Select 大量数据,对 MySQL 的影响非常大,容易造成慢查询,影响业务线上的正常服务。
  • 由于 Hive 本身的语法不支持更新、删除等 SQL 原语,对于 MySQL 中发生 Update / Delete 的数据无法很好地进行支持。

为了彻底解决这些问题,我们逐步转向 CDCChange Data Capture)+ Merge 的技术方案,即实时 Binlog 采集 + 离线处理 Binlog 还原业务数据的这样一套解决方案。Binlog 是 MySQL 的二进制日志,记录了 MySQL 中发生的所有数据变更,MySQL 集群自身的主从同步就是基于 Binlog 做的

本文主要从 Binlog 实时采集离线处理 Binlog 还原业务数据 两个方面,来介绍如何实现 DB 数据准确、高效地进入数仓。

2.整体架构

在这里插入图片描述
整体的架构如上图所示。在 Binlog 实时采集方面,我们采用了阿里巴巴的开源项目 Canal,负责从 MySQL 实时拉取 Binlog 并完成适当解析。Binlog 采集后会暂存到 Kafka 上供下游消费。整体实时采集部分如图中红色箭头所示。

离线处理 Binlog 的部分,如图中黑色箭头所示,通过下面的步骤在 Hive 上还原一张 MySQL 表:

  • 采用 Linkedin 的开源项目 Camus,负责每小时把 Kafka 上的 Binlog 数据拉取到 Hive 上。
  • 对每张 ODS 表,首先需要一次性制作快照(Snapshot),把 MySQL 里的存量数据读取到 Hive 上,这一过程底层采用直连 MySQL 去 Select 数据的方式。
  • 对每张 ODS 表,每天基于存量数据和当天增量产生的 Binlog 做 Merge,从而还原出业务数据。

我们回过头来看看,背景中介绍的批量取数并 Load 方案遇到的各种问题,为什么用这种方案能解决上面的问题呢?

  • 首先,Binlog 是流式产生的,通过对 Binlog 的实时采集,把部分数据处理需求由每天一次的批处理分摊到实时流上。无论从性能上还是对 MySQL 的访问压力上,都会有明显地改善。
  • 第二,Binlog 本身记录了数据变更的类型(Insert / Update / Delete),通过一些语义方面的处理,完全能够做到精准的数据还原。

3.Binlog 实时采集

对 Binlog 的实时采集包含两个主要模块:

  • 一是 CanalManager,主要负责采集任务的分配、监控报警、元数据管理以及和外部依赖系统的对接;
  • 二是真正执行采集任务的 CanalCanalClient

在这里插入图片描述
当用户提交某个 DB 的 Binlog 采集请求时,CanalManager 首先会调用 DBA 平台的相关接口,获取这一 DB 所在 MySQL 实例的相关信息,目的是从中选出最适合 Binlog 采集的机器。然后把 采集实例Canal Instance)分发到合适的 Canal 服务器 上,即 CanalServer 上。在选择具体的 CanalServer 时,CanalManager 会考虑负载均衡、跨机房传输等因素,优先选择负载较低且同地域传输的机器。

CanalServer 收到采集请求后,会在 ZooKeeper 上对收集信息进行注册。注册的内容包括:

  • 以 Instance 名称命名的永久节点。
  • 在该永久节点下注册以自身 ip:port 命名的临时节点。

这样做的目的有两个:

  • 高可用:CanalManager 对 Instance 进行分发时,会选择两台 CanalServer,一台是 Running 节点,另一台作为 Standby 节点。Standby 节点会对该 Instance 进行监听,当 Running 节点出现故障后,临时节点消失,然后 Standby 节点进行抢占。这样就达到了容灾的目的。
  • 与 CanalClient 交互:CanalClient 检测到自己负责的 Instance 所在的 Running CanalServer 后,便会进行连接,从而接收到 CanalServer 发来的 Binlog 数据。

对 Binlog 的订阅以 MySQL 的 DB 为粒度,一个 DB 的 Binlog 对应了一个 Kafka Topic。底层实现时,一个 MySQL 实例下所有订阅的 DB,都由同一个 Canal Instance 进行处理。这是因为 Binlog 的产生是以 MySQL 实例为粒度的。CanalServer 会抛弃掉未订阅的 Binlog 数据,然后 CanalClient 将接收到的 Binlog 按 DB 粒度分发到 Kafka 上。

4.离线还原 MySQL 数据

完成 Binlog 采集后,下一步就是利用 Binlog 来还原业务数据。首先要解决的第一个问题是把 Binlog 从 Kafka 同步到 Hive 上。

在这里插入图片描述

5.Kafka2Hive

整个 Kafka2Hive 任务的管理,在美团数据平台的 ETL 框架下进行,包括任务原语的表达和调度机制等,都同其他 ETL 类似。而底层采用 LinkedIn 的开源项目 Camus,并进行了有针对性的二次开发,来完成真正的 Kafka2Hive 数据传输工作。

6.对 Camus 的二次开发

Kafka 上存储的 Binlog 未带 Schema,而 Hive 表必须有 Schema,并且其分区、字段等的设计,都要便于下游的高效消费。对 Camus 做的第一个改造,便是将 Kafka 上的 Binlog 解析成符合目标 Schema 的格式。

对 Camus 做的第二个改造,由美团的 ETL 框架所决定。在我们的任务调度系统中,目前只对同调度队列的任务做上下游依赖关系的解析,跨调度队列是不能建立依赖关系的。而在 MySQL2Hive 的整个流程中,Kafka2Hive 的任务需要每小时执行一次(小时队列),Merge 任务每天执行一次(天队列)。而 Merge 任务的启动必须要严格依赖小时 Kafka2Hive 任务的完成。

为了解决这一问题,我们引入了 Checkdone 任务。Checkdone 任务是天任务,主要负责检测前一天的 Kafka2Hive 是否成功完成。如果成功完成了,则 Checkdone 任务执行成功,这样下游的 Merge 任务就可以正确启动了。

7.Checkdone 的检测逻辑

Checkdone 是怎样检测的呢?每个 Kafka2Hive 任务成功完成数据传输后,由 Camus 负责在相应的 HDFS 目录下记录该任务的启动时间。Checkdone 会扫描前一天的所有时间戳,如果最大的时间戳已经超过了 0 点,就说明前一天的 Kafka2Hive 任务都成功完成了,这样 Checkdone 就完成了检测。

此外,由于 Camus 本身只是完成了读 Kafka 然后写 HDFS 文件的过程,还必须完成对 Hive 分区的加载才能使下游查询到。因此,整个 Kafka2Hive 任务的最后一步是加载 Hive 分区。这样,整个任务才算成功执行。

每个 Kafka2Hive 任务负责读取一个特定的 Topic,把 Binlog 数据写入 original_binlog 库下的一张表中,即前面图中的 original_binlog.db,其中存储的是对应到一个 MySQL DB 的全部 Binlog。

在这里插入图片描述
上图说明了一个 Kafka2Hive 完成后,文件在 HDFS 上的目录结构。假如一个 MySQL DB 叫做 user,对应的 Binlog 存储在 original_binlog.user 表中。ready 目录中,按天存储了当天所有成功执行的 Kafka2Hive 任务的启动时间,供 Checkdone 使用。每张表的 Binlog,被组织到一个分区中,例如 userinfo 表的 Binlog,存储在 table_name=userinfo 这一分区中。每个 table_name 一级分区下,按 dt 组织二级分区。图中的 xxx.lzoxxx.lzo.index 文件,存储的是经过 lzo 压缩的 Binlog 数据。

8.Merge

Binlog 成功入仓后,下一步要做的就是基于 Binlog 对 MySQL 数据进行还原。Merge 流程做了两件事,首先把当天生成的 Binlog 数据存放到 Delta 表中,然后和已有的存量数据做一个基于主键的 Merge。Delta 表中的数据是当天的最新数据,当一条数据在一天内发生多次变更时,Delta 表中只存储最后一次变更后的数据。

把 Delta 数据和存量数据进行 Merge 的过程中,需要有唯一键来判定是否是同一条数据。如果同一条数据既出现在存量表中,又出现在 Delta 表中,说明这一条数据发生了更新,则选取 Delta 表的数据作为最终结果;否则说明没有发生任何变动,保留原来存量表中的数据作为最终结果。Merge 的结果数据会 Insert Overwrite 到原表中,即前面图中的 origindb.table

9.Merge 流程举例

下面用一个例子来具体说明Merge的流程。

在这里插入图片描述
数据表共 idvalue 两列,其中 id 是主键。在提取 Delta 数据时,对同一条数据的多次更新,只选择最后更新的一条。所以对 id=1 的数据,Delta 表中记录最后一条更新后的值 value=120。Delta 数据和存量数据做 Merge 后,最终结果中,新插入一条数据(id=4),两条数据发生了更新(id=1id=2),一条数据未变(id=3)。

默认情况下,我们采用 MySQL 表的主键作为这一判重的唯一键,业务也可以根据实际情况配置不同于 MySQL 的唯一键。

上面介绍了基于 Binlog 的数据采集和 ODS 数据还原的整体架构。下面主要从两个方面介绍我们解决的实际业务问题。

10.实践一:分库分表的支持

随着业务规模的扩大,MySQL 的分库分表情况越来越多,很多业务的分表数目都在几千个这样的量级。而一般数据开发同学需要把这些数据聚合到一起进行分析。如果对每个分表都进行手动同步,再在 Hive 上进行聚合,这个成本很难被我们接受。因此,我们需要在 ODS 层就完成分表的聚合。

在这里插入图片描述
首先,在 Binlog 实时采集时,我们支持把不同 DB 的 Binlog 写入到同一个 Kafka Topic。用户可以在申请 Binlog 采集时,同时勾选同一个业务逻辑下的多个物理 DB。通过在 Binlog 采集层的汇集,所有分库的 Binlog 会写入到同一张 Hive 表中,这样下游在进行 Merge 时,依然只需要读取一张 Hive 表。

第二,Merge 任务的配置支持正则匹配。通过配置符合业务分表命名规则的正则表达式,Merge 任务就能了解自己需要聚合哪些 MySQL 表的 Binlog,从而选取相应分区的数据来执行。

这样通过两个层面的工作,就完成了分库分表在 ODS 层的合并。

这里面有一个技术上的优化,在进行 Kafka2Hive 时,我们按业务分表规则对表名进行了处理,把物理表名转换成了逻辑表名。例如 userinfo123 这张表名会被转换为 userinfo,其 Binlog 数据存储在 original_binlog.user 表的 table_name=userinfo 分区中。这样做的目的是防止过多的 HDFS 小文件和 Hive 分区造成的底层压力。

11.实践二:删除事件的支持

Delete 操作在 MySQL 中非常常见,由于 Hive 不支持 Delete,如果想把 MySQL 中删除的数据在 Hive 中删掉,需要采用 “迂回” 的方式进行。

对需要处理 Delete 事件的 Merge 流程,采用如下两个步骤:

  • 首先,提取出发生了 Delete 事件的数据,由于 Binlog 本身记录了事件类型,这一步很容易做到。将存量数据(表 A)与被删掉的数据(表 B)在主键上做左外连接(Left outer join),如果能够全部 join 到双方的数据,说明该条数据被删掉了。因此,选择结果中表 B 对应的记录为 NULL 的数据,即是应当被保留的数据。
  • 然后,对上面得到的被保留下来的数据,按照前面描述的流程做常规的 Merge。

在这里插入图片描述

12.总结与展望

作为数据仓库生产的基础,美团数据平台提供的基于 Binlog 的 MySQL2Hive 服务,基本覆盖了美团内部的各个业务线,目前已经能够满足绝大部分业务的数据同步需求,实现 DB 数据准确、高效地入仓。在后面的发展中,我们会集中解决 CanalManager 的单点问题,并构建跨机房容灾的架构,从而更加稳定地支撑业务的发展。

本文主要从 Binlog 流式采集和基于 Binlog 的 ODS 数据还原两方面,介绍了这一服务的架构,并介绍了我们在实践中遇到的一些典型问题和解决方案。希望能够给其他开发者一些参考价值,同时也欢迎大家和我们一起交流。


本文转载于:

  • 作者:美团技术团队
  • 标题:美团DB数据同步到数据仓库的架构与实践
  • 链接:https://tech.meituan.com/2018/12/06/binlog-dw.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/107541.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

html5学习笔记21-css简略学习

CSS https://www.runoob.com/css/css-tutorial.html CSS (Cascading Style Sheets,层叠样式表),是一种用来为结构化文档(如 HTML 文档或 XML 应用)添加样式(字体、间距和颜色等)的计算机语言&a…

Windows环境下Elasticsearch相关软件安装

Windows环境下Elasticsearch相关软件安装 本文将介绍在 windows 环境下安装 Elasticsearch 相关的软件。 1、安装Elasticsearch 1.1 安装jdk ElasticSearch是基于lucence开发的,也就是运行需要java jdk支持,所以要先安装JAVA环境。 由于ElasticSear…

合宙Air724UG LuatOS-Air LVGL API控件-二维码(Qrcode)

二维码(Qrcode) 示例代码 qrcodelvgl.qrcode_create(lvgl.scr_act(),nil)lvgl.qrcode_set_txt(qrcode,"https://doc.openluat.com/home")lvgl.obj_set_size(qrcode,400,400)lvgl.obj_align(qrcode, nil, lvgl.ALIGN_CENTER, 0, 0)创建 可以通…

【服务器 | 宝塔】宝塔面板卸载重装教程:清理删除宝塔面板并重新开始

宝塔面板卸载重装怎么操作?我们很多用户可能安装宝塔之后会经常看一下有没有新版本,如果有新版直接右上角 宝塔面板卸载重装怎么操作?我们很多用户可能安装宝塔之后会经常看一下有没有新版本,如果有新版直接右上角”更新”升级一下版本就可以了&#…

【聚类】DBCAN聚类

OPTICS是基于DBSCAN改进的一种密度聚类算法,对参数不敏感。当需要用到基于密度的聚类算法时,可以作为DBSCAN的一种替代的优化方案,以实现更优的效果。 原理 基于密度的聚类算法(1)——DBSCAN详解_dbscan聚类_root-ca…

OIDC协议 — 理解OIDC身份认证授权

1、OIDC协议与OAuth2协议的区别 1.1 OAuth2 协议 我们在日常开发中,OAuth2协议可能接触比较多,例如用户权限系统集成Spring Security OAuth2进行身份授权认证;那什么是OAuth2协议? OAuth 即 Open Authorization ,用于…

React-Hooks 和 React-Redux

注:Redux最新用法参考 个人React专栏 react 初级学习 Hooks基本介绍------------------------- Hooks:钩子、钓钩、钩住, Hook 就是一个特殊的函数,让你在函数组件中获取状态等 React 特性 ,是 React v16.8 中的新增功…

分类预测 | MATLAB实现WOA-CNN-BiGRU鲸鱼算法优化卷积双向门控循环单元数据分类预测

分类预测 | MATLAB实现WOA-CNN-BiGRU鲸鱼算法优化卷积双向门控循环单元数据分类预测 目录 分类预测 | MATLAB实现WOA-CNN-BiGRU鲸鱼算法优化卷积双向门控循环单元数据分类预测分类效果基本描述模型描述程序设计参考资料 分类效果 基本描述 1.Matlab实现WOA-CNN-BiGRU多特征分类…

PyQt5安装以及使用

PyQt5安装以及使用 1 简介2 安装方法2.1 pip install PyQt52.2 pip install PyQt5‑5.15.9‑cp37‑cp37m‑win_amd64.whl 3 Qt相关工具介绍4 安装PyQt5-tools5 设置环境变量6 测试7 Qt Designer7.1 安装7.2 在 PyCharm 中配置 Qt Designer7.3 在PyCharm中测试Qt Designer 8 将u…

使用ExcelJS快速处理Node.js爬虫数据

什么是ExcelJS ExcelJS是一个用于处理Excel文件的JavaScript库。它可以让你使用JavaScript创建、读取和修改Excel文件。 以下是ExcelJS的一些主要特点: 支持xlsx、xlsm、xlsb、xls格式的Excel文件。可以创建和修改工作表、单元格、行和列。可以设置单元格样式、字…

android studio安卓模拟器高德SDK定位网络连接异常

背景 使用了高德SDK创建了一个 project, 下面是运行界面: 点击 "开始定位"按钮, 结果并没有返回定位信息, 而是报错了: 根据错误提示打开这个网址: https://lbs.amap.com/api/android-location-sdk/guide/utilities/errorcode, 并且找到错误码 4 的信息, 显示的是网…

【2023年11月第四版教材】第12章《质量管理》(第三部分)

第12章《质量管理》(第三部分) 5 管理质量5.1 管理质量★★★ (17下9)5.2 数据分析★★★5.3 数据表现★★★5.4 审计★★★ 6 控制质量6.1 控制质量6.2 数据收集★★★6.3 数据分析 ★★★6.4 数据表现★★★ 5 管理质量 组过程…