ES 数据写入方式:直连 VS Flink 集成系统

ES 作为一个分布式搜索引擎,从扩展能力和搜索特性上而言无出其右,然而它有自身的弱势存在,其作为近实时存储系统,由于其分片和复制的设计原理,也使其在数据延迟和一致性方面都是无法和 OLTP(Online Transaction Processing)系统相媲美的。

也正因如此,通常它的数据都来源于其他存储系统同步而来,做二次过滤和分析的。这就引入了一个关键节点,即 ES 数据的同步写入方式,本文介绍的则是 MySQL 同步 ES 方式。

将 MySQL 数据写入 ES,首先想到的一定是消费 Binlog 直连 ES 写入,这种方式简单明了,然而如果稍微考量维度多一点,就会发现该方式的一些弊端。因此还有另外一个方式,即【RocketMQ + Flink Consumer + ES Bulk】集成生态,我们将从同步延迟、消费特性,ES 写入性能、系统容灾能力四个方面评估这两种接入方式,希望给到大家灵感并选择适合业务的同步方式。

ES 基础写入原理

ES 写入属于追加式写入,先形成特定大小的 Segment,然后定时 Merge 小数据段为大数据段以减少内存碎片,提升查询效率的过程。一个 Index 由 N 个 Shard 及其副本构成,存储了同一种 Type 类型的 Documents,由 Mapping 定义了其索引方式,每一个 Shard 由 N 个 Segment 组成,每个 Shard 都是一个全功能且完整的 Lucene 索引,它是 ES 的最小处理单元;Segment 是 ES 最小的数据处理单位,每个 Segment 都是一个独立的倒排索引。

ES 写入其实是不断将数据写入到同一个 Segment(内存),然后触发 Refresh 刷新,将 Segment 刷新到 OS Cache(默认 1s),此时数据就可以查询到了,OS Cache 会由操作系统触发 Flush 操作持久化到磁盘。

引发思考:ES 是如何保证数据不丢失的呢?追加式写入的优劣点是什么?追加式写入是如何处理数据更新问题的?MySQL 是属于哪种写入方式呢?本文重点不在此处,大家可以另行查阅文章。

ES 基本概念

ES 写入过程

ES 直连写入

采用 ES 直连写入的优点是因为路径短,依赖组件少,加上 Dsyncer(异构存储转换系统)通常已经提供了完善的限流重试机制,所以消费延迟和消费的数据完整性都是可以保证的。

缺点:

  1. 不易于接入多机房容灾部署,目前 ES 容灾机房都属于独立部署,独立读写模式,所以如果采用该方式,则难以同时对多机房写入分别做管控,达不到容灾效果。Binlog-->Dsyncer 通常一个 MySQL Table 对应一个转换任务,如果为了写多机房起多个重复的转换任务,则显得有些愚笨。

  2. 如果自身业务场景有对同一条记录并发写场景,但写不一定全部来源于 Binlog 的情况下,那全局考虑直写 ES 则更容易遇到写入冲突问题,因为缺乏有序队列的保障。

通过 Flink 搭建 ES 集成系统

Flink 搭建 ES 集成系统,则指的是所有的 ES 写入都由 Flink 任务完成,Flink 监听 RocketMQ 实时数据流,既保证了数据的分区有序性,又充分利用了 ES 的批量写入能力,ES 的批量写入能力比单条写入性能高出多倍。同时由于 Flink 本身的容错性,即使在异常场景下,也能保证数据的最终一致性。

优点

  1. 通过 MQ 可以更快捷的接入多机房 ES 集群,写入解耦,三机房分别起消费者写入数据,彼此独立,当出现单机房故障时,只要有可用机房,直接处理读流量切流即可,容灾方案简单清晰

  2. 网络抖动等问题会导致 ES 暂时性写入失败时,不影响其他集群写入的情况下,RocketMQ 会暂存消息,Flink 会保存消费快照,不断重试直至成功,更好的保障了数据最终一致性

  3. 多数据源写入能保证全局分区一致性。

缺点

  1. 依赖了更多组件,会增加全链路数据同步延迟,而 ES 默认的 Refresh 频率是每秒一次,经测试该链路正常情况下数据延迟都是秒级的,不是完全不可接受;

  2. 依赖了更多组件,对基础组件的稳定性有更高的要求,RocketMQ 异常,或者 Flink 任务异常都会导致同步链路出现问题,增加一定的业务异常风险。

在这里需要注意的一个问题是有人可能会考虑接入多机房 ES 集群,是怎么保证多机房同时成功的、以及怎么保证写入成功后就可以查询得到?目前这两点暂时无法做到,因为多个机房都是独立写入的,互不影响,且 ES 集群属于弱数据一致性集群,无法保证写入成功立刻就能查到。

搭建并运行一个 ES Flink 消费程序的必备条件

  • Flink 运行环境:首先需要有 Flink 任务的运行环境,通常企业级的 Flink 任务会作为一个 YARN 作业在分布式系统中被调度并分配资源执行,但同时 Flink 也可作为单机进程,亦或搭建一个独立集群运行。

  • ES 消息格式:需要约定一种 ES 消息传输格式和序列化方式,一套范式解决所有同步场景,目前流行的序列化方式是 pb 格式或 json 格式,目前我们都是推荐使用 pb 格式的,数据格式 Schema 定义:

字段名

值类型

必需/可选

描述

_index

string

必需

文档要写入索引的名称或别名

_type

string

必需/可选

文档的类型

_op_type

string

必需

文档写入操作类型,取值范围: index, create, update, upsert, delete

_id

string

可选

文档 ID,不指定时写入 ES 会自动生成,但同一条数据被重复消费写入 ES 会生成多个文档

_routing

string

可选

文档路由,不指定时默认使用 _id 字段值路由

_version

int64

可选

文档版本,指定时大于 0 且仅操作为 index/delete 有效,默认使用 external_gte 版本类型

_source

object

必需/可选

文档内容,操作类型为 delete 时可不指定

_script

object

可选

文档脚本,操作类型为 update/upsert 时有效,但和 _source 不能同时存在

syntax = "proto3";message ESIndexInfo {string Name = 1;  // 文档要写入索引的名称或别名
}enum ESOPType { // 文档写入操作类型DELETE = 0; // 删除文档INDEX = 1;  // 创建新文档或更新老文档,只能全量更新 (替换老文档)UPDATE = 2; // 更新老文档,支持部分更新 (合并老文档)UPSERT = 3; // 创建新文档或更新老文档,支持部分更新 (合并老文档)CREATE = 4; // 创建新文档,存在时报错丢弃
}message ESDocAction {ESIndexInfo IndexInfo = 1; // 索引信息 (必需)ESOPType OPType = 2;       // 操作类型 (必需)string ID = 3;             // 文档 ID (可选)string Doc = 4;            // 文档内容 (JSON 格式, 删除操作时不需要)int64 Version = 5;         // 文档版本 (可选, 大于 0 且操作为 index/create/delete 有效)string Routing = 6;        // 文档路由 (可选, 非空有效)string Script = 7;         // 文档脚本 (JSON 格式, 操作类型为 update/upsert 有效,但和 Doc 不能同时存在)
}
  • Flink 任务必要配置:监听的 RocketMQ Topic 信息,写 ES 集群信息;

  • Flink 执行函数:Flink 处理流式消息有流式 SQL 和自定义应用程序两种方式,流式 SQL 约束于本身的一些限制,比如不支持同一个 MQ 有多个索引消息,而自定义编程更加灵活,比如添加各种打点,日志,错误码处理等,推荐该方式;

  • Flink 资源配置:JobManager 资源配置,TaskManager 资源配置等等;

  • Flink 自定义参数配置:可以自定义一些与应用程序紧密相关的动态配置,方便动态调节 Flink 消费能力,比如:

参数名

用途

默认值

job.writer.connector.bulk-flush.max-actions

单次 bulk 最大文档数,超过进行一次 flush (即执行一次 es 的 bulk 请求)

默认 300

job.writer.connector.bulk-flush.max-size

单次 bulk 最大字节数,超过进行一次 flush (即执行一次 es 的 bulk 请求)

默认 10MB

job.writer.connector.bulk-flush.interval

两次 bulk 最大间隔,超过进行一次 flush (即执行一次 es 的 bulk 请求)

默认 1000ms

job.writer.connector.global-rate-limit

全局写入限速值

默认 -1,不限速

job.writer.connector.failure-handler

指定自定义失败处理器,比如处理4xx错误,5xx错误的方式不同,429总是无限重试等;

global_parallelism_num

flink 任务全局并发度

rmq 是 queue/4,bmq/kafka 是 partition/3

max_parallelism_num

flink 任务最大并发度

mq 的 queue/partition 的个数

checkpoint_interval

创建 Checkpoint 的间隔,单位 ms (5min=300000)

默认 15min

checkpoint_timeout

创建 Checkpoint 的超时时间,单位 ms (5min=300000)

默认 10min

rebalance_enable

开启乱序消费

默认 false

对比建议

写入方式

同步延迟

写入特性

ES写入性能

消费者

容灾能力

直连

依赖组件少,延迟低

Binlog 单 key 有序

bulk写入

FaaS

较差

RocketMQ+Flink+ES

依赖组件多,延迟较高/秒级

全局单 key 有序

bulk写入

Flink

经过以上介绍如果业务在都可接受秒级延迟的条件下,使用 RocketMQ+Flink 的方式能够更好的实现有序性和容灾能力,Flink 在流式任务处理能力上也远优 FaaS,但是直连方式明显链路更加简洁,架构更加轻量,系统集成和维护成本较低,所以还是需要依照业务特性选择最适合的才是最好的。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/703295.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

高效稳定,AH1515-15V转12V8A稳压芯片成为各种设备首选

高效稳定,AH1515 15V转12V稳压芯片成为各种设备首选 随着科技的不断发展,各种电子设备对电源的要求越来越高,尤其是电压稳定性和电流输出能力。为了解决这一问题,我国一款名为AH1515的稳压芯片应运而生,凭借其优异的性…

FBX Python SDK 入门教程

安装 1. 下载 FBX Python SDK 官网地址,根据你的操作系统,下载相应的 FBX SDK 2020.3.4 Python exe 文件。 2. 安装 执行下载的文件 fbx202034_fbxpythonsdk_win.exe,安装完的程序路径我的是 D:\Program Files\Autodesk\FBX\FBX Python SDK\…

Django创建网站的地基

相关文档 1、为新网站创建一个文件夹(这里是:locallibrary) D:\django>mkdir locallibraryD:\django>cd locallibraryD:\django\locallibrary>dirVolume in drive D is 新加卷Volume Serial Number is B68C-03F7Directory of D:\dj…

vue+element 验证输入框是否包含特殊字符(赠送ip+域名双验证判断)

1 效果: 2 添加element自定义验证方法 rules{ dbConfig.address: [ { required: true, validator: validatehost_ip, trigger: blur } ] } 3 添加对应方法 var validatehost_ip (rule, value, callback) > {if (!value) {callback(new Error(地址不可为空))} else {//验…

图书个性化推荐系统的设计与实现

springboot003图书个性化推荐系统的设计与实现 亲测完美运行带论文:获取源码,私信评论或者v:niliuapp 运行视频 包含的文件列表(含论文) 数据库脚本:db.sql其他文件:ppt.pptx论文/文档:论文…

什么是ARP攻击,怎么做好主机安全,受到ARP攻击有哪些解决方案

在数字化日益深入的今天,网络安全问题愈发凸显其重要性。其中,ARP攻击作为一种常见的网络攻击方式之一,往往给企业和个人用户带来不小的困扰。ARP协议是TCP/IP协议族中的一个重要协议,负责把网络层(IP层)的IP地址解析为数据链路层…

代购系统搭建,淘宝、1688海外代购系统建设以及部分前端源码展示

客户登录主界面&#xff0c;可以根据个人需求更换。 可支持个人定制模块化&#xff0c;也有一些模块可供选择 系统演示站测试 部分源码展示&#xff1a; <!DOCTYPE html> <html><head><meta charset"utf-8"> <title>会员中心 – 淘…

JavaScript引入方式

JS引入方式 1 内部脚本方式引入2 外部脚本方式引入 1 内部脚本方式引入 说明 在页面中,通过一对script标签引入JS代码script代码放置位置具备一定的随意性,一般放在head标签中居多 代码 <!DOCTYPE html> <html lang"en"><head><meta charset…

NGINX SPRING HTTPS证书

服务器&#xff1a;xxx.xxx.xxx.56 客户端器&#xff1a;xxx.xxx.xxx.94##生成服务器证书和密钥容器 keytool -genkey -alias tas-server -keypass 250250 -keyalg RSA -keysize 2048 -validity 3650 -keystore D:\https证书\tas-server.jks -storepass 250250 -dname "C…

STL—string类(1)

一、string类 1、为什么要学习string&#xff1f; C语言中&#xff0c;字符串是以\0结尾的一些字符的集合&#xff0c;为了操作方便&#xff0c;C标准库中提供了一些str系列的库函数&#xff0c;但是这些库函数与字符串是分离开的&#xff0c;不太符合OOP&#xff08;面向对象…

Google I/O 2024 干货全解读:Gemini AI 横空出世,智能未来触手可及!

Google I/O 2024 干货全解读&#xff1a;Gemini AI 横空出世&#xff0c;智能未来触手可及&#xff01; 博主猫头虎的技术世界 &#x1f31f; 欢迎来到猫头虎的博客 — 探索技术的无限可能&#xff01; 专栏链接&#xff1a; &#x1f517; 精选专栏&#xff1a; 《面试题大全》…