认识 TapFlow,以编程方式运行 TapData

news/2025/3/4 11:44:35/文章来源:https://www.cnblogs.com/tapdata/p/18750189

**什么是TapFlow? **

TapFlow 是 TapData Live Data Platform 最新推出的一个面向编程的API 框架。TapFlow 可以让开发者和数据工程师用一个简单易用而又强大的编程语言来进行数据管道和数据模型的开发工作。
这次的发布包括一个 Python 的SDK。TapFlow 需要连接一个 TapData Cluster(可以是企业版,云版或社区版)才能运行。

为何需要编程式的方式?

TapData 目前提供的是一个以可视化拖拉拽方式来构建数据管道,数据开发的UI界面。 UI界面在易使用和易运维上有很大的优势,但是在不少地方也有一些局限性。TapFlow 希望能够对这些局限性提供一个有效的补充。

  1. 满足开发者的深度定制需求,复杂处理逻辑需要JS和Python代码

在一些复杂脱敏逻辑,或者定制化的数据字段值标准化的时候需要使用不少代码。这个时候如果有频繁的改动,UI界面的代码操作就不会很方便

  1. **更好地支持 CI/CD 和自动化 **

团队需要定期部署和更新多个环境中的数据集成任务(如开发、测试、生产)。使用编程式 API,可以通过脚本自动完成任务的生成和迁移,轻松与 Git 等版本管理工具集成,而无需人工在 GUI 中操作。

  1. 降低复杂场景下的操作成本

经常有用户需要同步 100+ 个数据库表,且每张表有不同的字段映射规则。在 GUI 中手动设置这些规则不仅繁琐,而且易出错,而通过编程式 API 可以实现规则的自动化生成。

  1. 面向开发者与技术团队的友好性,更容易集成到工作流

提供面向开发者的工具,更贴近他们的工作方式,更容易和其他业务模块进行集成。
开放代码能力,产品更容易扩展,如增加企业内部的可复用组件等。

TapFlow 使用示例

我们假设我们有一个CRM 应用运行在MySQL 数据库上,下面是这个 MySQL 库的schema:

由于查询性能的考量和一些特定的宽表需求,我们需要将订单数据复制到MongoDB里面来发布一个订单查询API。我们会用 TapFlow 把数据从MySQL 里面复制到MongoDB, 过程中对数据做一些加工处理和合并的操作。

**安装 TapFlow **

# pip3 install tapflow

TapFlow 的Python SDK 支持两种模式: 以程序方式执行,或在交互模式下运行。接下来我们以交互模式下来展现如何使用 TapFlow API。

`# tap

--- Please configure TapData cluster ---

Tap Flow requires TapData Live Data Platform(LDP) cluster to run.
If you would like to use with TapData Enterprise or TapData Community, type L to continue.
If you would like to use TapData Cloud, or you are new to TapData, type C or press ENTER to continue.
Please type L or C (L/[C]):`

在你输入相应的 TapData 连接信息或者密钥后,就可以开始在交互模式下使用。

更多安装信息可以参见 Quick Start 文档: https://docs.tapdata.net/tapflow/quick-start

创建一个源库和目标库连接

`tap> mysql_conn = DataSource('mysql', 'MySQL_ECommerce',
{
'database': 'ECommerceData',
'port': 3306,
'host': 'demo.tapdata.io',
'username': 'demouser',
'password': 'demopass'
})
.type('source')
.save()

tap> mongodb_conn = DataSource("mongodb", "MongoDB_ECommerce",
{
"uri": "mongodb://your_username:your_passwd@192.168.1.18:27017/ecommerce?authSource=admin"
})
.type("target")
.save()`

创建一个简单的订单表的复制任务(Data Flow)

目的:将 MySQL 的订单数据同步到 MongoDB 中做统一订单查询

tap> myflow = Flow("simple_data_replication_flow") \ .read_from("MySQL_ECommerce.ecom_orders") \ .write_to("MongoDB_ECommerce.orders_replica_collection") \ .save() tap> myFlow.start()

过滤一些字段
目标表不需要一些字段

tap> myflow = Flow("order_fields_exclude_flow") \ .read_from("MySQL_ECommerce.ecom_orders") \ .exclude("order_delivered_carrier_date", "order_delivered_customer_date") \ .write_to("MongoDB_ECommerce.orders_renamed_fields_collection") \ .save() tap> myFlow.start()

字段改名

tap> myflow = Flow("order_fields_rename_flow") \ .read_from("MySQL_ECommerce.ecom_orders") \ .rename_fields({'order_purchase_timestamp': 'purchase_timestamp'}) \ .write_to("MongoDB_ECommerce.orders_exclude_fields_collection") \ .save() tap> myFlow.start()

用Python 脚本做复杂自定义处理

`# 标准化 order_status 的字段值
tap> def pyfunc(record):
if not record['order_status'] :
record['order_status'] = 'invalid'
if record['order_status'] == 'SendError' :
record['order_status'] = 'undeliverable'
return record # 返回处理后的记录

创建数据流任务,应用 Python 函数,并将结果写入目标数据库

tap> myflow = Flow("orders_complex_data_processing_flow")
.read_from("MySQL_ECommerce.ecom_orders")
.py(pyfunc)
.write_to("MongoDB_ECommerce.orders_processed_collection")
.save()
tap> myFlow.start()`

在订单表里通过 lookup 补齐客户信息

tap> myflow = Flow("orders_lookup_flow") \ .read_from("MySQL_ECommerce.ecom_orders") \ .lookup("MySQL_ECommerce.ecom_customers", relation=[["customer_id", "customer_id"]]) .write_to("MongoDB_ECommerce.wide_orders_collection") \ .save() tap> myFlow.start()

最后的结果

`tap> use MongoDB_ECommerce
datasource switch to: MongoDB_ECommerce

tap> peek wide_orders_collection
table wide_orders_collection has 12641 records
{
'order_status': 'unavailable',
'customer_state': 'SP',
'customer_unique_id': 'a77550dd00887c5bb24100ccbd08cbe9',
'order_estimated_delivery_date': '2017-11-03 00:00:00',
'_id': '676154705338b293ccd85c80',
'customer_id': '3a92efdb6e6163dc1734d44f2f5f6d04',
'order_id': '0010dedd556712d7bb69a19cb7bbd37a',
'purchase_timestamp': '2017-10-21 19:32:06',
'order_approved_at': '2017-10-24 03:25:32',
'customer_city': 'sao paulo',
'customer_zip_code_prefix': '04851'
}
{
'order_status': 'shipped',
'customer_state': 'BA',
'customer_unique_id': '205d5aa158338f2b733a07326aae8c87',
'order_estimated_delivery_date': '2018-04-16 00:00:00',
'_id': '676154705338b293ccd85c82',
'customer_id': '7fa80efb1ef15ca4104627910c29791c',
'order_id': '002f19a65a2ddd70a090297872e6d64e',
'purchase_timestamp': '2018-03-21 13:05:30',
'order_approved_at': '2018-03-21 13:15:27',
'customer_city': 'camacari',
'customer_zip_code_prefix': '42804'
}
{
'order_status': 'canceled',
'customer_state': 'MG',
'customer_unique_id': 'ec979208947bbba310f2ad8e50963725',
'order_estimated_delivery_date': '2018-08-29 00:00:00',
'_id': '676154705338b293ccd85c84',
'customer_id': '0dad07848c618cc5a4679a1bfe1db8d2',
'order_id': '00310b0c75bb13015ec4d82d341865a4',
'purchase_timestamp': '2018-08-15 14:29:08',
'order_approved_at': '2018-08-15 15:04:25',
'customer_city': 'belo horizonte',
'customer_zip_code_prefix': '31160'
}
{
'order_status': 'unavailable',
'customer_state': 'BA',
'customer_unique_id': 'cd88b962adbc4b174353217f99dc6174',
'order_estimated_delivery_date': '2018-01-08 00:00:00',
'_id': '676154705338b293ccd85c85',
'customer_id': '3d2f26eab3f79dd1fe9977f615e70c2f',
'order_id': '00a500bc03bc4ec968e574c2553bed4b',
'purchase_timestamp': '2017-11-23 10:53:01',
'order_approved_at': '2017-11-25 10:54:38',
'customer_city': 'salvador',
'customer_zip_code_prefix': '41180'
}
{
'order_status': 'shipped',
'customer_state': 'BA',
'customer_unique_id': '60ec651482858c327c177cf9360cc0a2',
'order_estimated_delivery_date': '2018-09-18 00:00:00',
'_id': '676154705338b293ccd85c87',
'customer_id': '7a399396442d5601cbedfbd0a3cf1da4',
'order_id': '00a99c50fdff7e36262caba33821875a',
'purchase_timestamp': '2018-08-17 16:25:04',
'order_approved_at': '2018-08-17 16:35:18',
'customer_city': 'teixeira de freitas',
'customer_zip_code_prefix': '45990'
}`

更多的交互式命令参考可以看这个文档:https://docs.tapdata.net/tapflow/tapcli-reference

以 Python 程序方式来运行上面的代码

`# cat order_mview.py
from tapflow.lib import *
from tapflow.cli.cli import init

mysql_conn = DataSource('mysql', 'MySQL_ECommerce',
{
'database': 'ECommerceData',
'port': 3306,
'host': 'demo.tapdata.io',
'username': 'demouser',
'password': 'demopass'
})
.type('source')
.save()

mongodb_conn = DataSource("mongodb", "MongoDB_ECommerce",
{
"uri": "mongodb://your_username:your_passwd@192.168.1.18:27017/ecommerce?authSource=admin"
})
.type("target")
.save()

def pyfunc(record):
if not record['order_status'] :
record['order_status'] = 'invalid'
if record['order_status'] == 'SendError' :
record['order_status'] = 'undeliverable'
return record

myflow = Flow("mysql_order_flow")
.read_from("MySQL_ECommerce.ecom_orders")
.exclude("order_delivered_carrier_date", "order_delivered_customer_date")
.rename_fields({'order_purchase_timestamp': 'purchase_timestamp'})
.py(pyfunc)
.lookup("MySQL_ECommerce.ecom_customers", relation=[["customer_id", "customer_id"]])
.write_to("MongoDB_ECommerce.wide_orders_collection")
.save()

myflow.start() # Start the flow

python order_mview.py`

已知问题

  • Lookup 目前只能用 MongoDB 作为目标

TapFlow 路线图

我们将持续改进 TapFlow的能力,以下是一些中长期的路线图功能:

  • Lookup 支持除了 MongoDB 以外的更多目标库
  • 支持项目工程 Project
  • 支持 inner join
  • 提供 Java SDK / RESTful API

关于 TapData Live Data Platform:

如果你还不是很熟悉,TapData 是为企业内部的实时数据集成和实时数据服务专门打造的一个实时数据平台。它具有以下特点:

  1. 专为实时数据管道而设计的框架: 基于CDC技术,数据采集及处理延迟可在亚秒级
  2. 高性能:单节点每秒可处理数十万条记录
  3. 内置丰富的 CDC(Change Data Capture)连接器:快速对接 Oracle, DB2, Sybase, MySQL, PostgreSQL, MSSQL 等
  4. 丰富的实时数据处理功能:过滤,改名,增删字段
  5. 多表关联: 构建持续更新的物化视图,数据汇聚
  6. UDF: Javascript 或 Python 自定义function,处理复杂逻辑
  7. 至少一次和精确一次的一致性保障
  8. 完善的数据校验: 全量/增量校验,哈希校验,二次校验等
  9. 国产数据库支持:Dameng, Kingbase, GaussDB, OceanBase, GBase, VastBase
  10. Kafka 支持:作为生产者把数据库事件直接推送给Kafka,或从Kafka 队列消费事件
  11. 支持私有化和全托管:可以开源版线下部署,或直接使用TapData Cloud的全托管服务

TapData 的常见应用场景包括:

  • 替代 Oracle Golden Gate 进行数据库间实时同步
    传统数据库复制工具往往成本高昂且复杂,TapFlow 提供了一种轻量级且易用的替代方案,帮助您在不同的数据库之间高效同步数据,支持从 MySQL 到 PostgreSQL,甚至是 NoSQL 数据库。

  • 替代 Kafka 构建实时数据管道
    对于那些需要实时传输数据的场景,TapFlow 是一个强有力的替代方案。它无需部署复杂的 Kafka 集群,而是直接通过轻量化的方式提供同等甚至更高效的数据管道构建能力。

  • 创建持续刷新的物化视图,用来做查询加速,读写分离等
    当业务需要实时查询最新的数据结果时,物化视图是一种高效的方式。TapFlow 可以持续刷新物化视图,保证数据的实时性,从而支持实时分析与决策。

  • 数据实时入仓入湖
    现代数据分析的趋势是实时化,TapFlow 可以将数据实时写入数据仓库或数据湖(如Apache Doris, Clickhouse, 或者云数仓如 Ali Cloud ADB, SelectDB, BigQuery)

  • 通用流式 ETL 数据处理
    TapFlow 同样支持复杂的 ETL(抽取、转换、加载)任务,借助 Python 的灵活性和内置的处理能力,开发者可以轻松处理复杂的数据转换需求。

加入社区

欢迎加入我们的社区互动,你可以:

  • 在 Github 上面 帮我们点个星星
  • 加入我们的社区微信群

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/893348.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Maya 影视渲染,渲染101 让创作无压力!

Maya 创作时,渲染是不是常让人崩溃?漫长等待、电脑性能不足、报错频出,今天就给大家分享基于渲染 101 平台的 Maya 云渲染,轻松解决这些难题!告别漫长等待,效率飙升**** 自己电脑渲染复杂 Maya 项目,耗时久,进度慢。渲染 101 的云渲染有强大计算集群,众多高性能服务器…

Java SpringBoot 升级后,编译打包都没问题,运行报错

编译打包都没问题,运行报错 10:36:39,587 |-INFO in ch.qos.logback.classic.joran.JoranConfigurator@5966cc - Registering current configuration as safe fallback point Exception in thread "main" java.lang.NoClassDefFoundError: org/springframework/core…

在Hyper-V虚拟化平台上,怎么创建和管理虚拟机呢?

确实,在Hyper-V虚拟化平台上,创建和管理虚拟机(VMs)是实现资源高效利用和业务灵活部署的关键。以下是对这一观点的详细阐述:一、创建虚拟机:资源高效利用的基础 资源分配与优化: 在创建虚拟机时,管理员需要根据业务需求合理分配CPU、内存、存储和网络等资源。通过精确的…

STM32实战——ESP8266 WIFI模块

此篇博文提供了ESP8266的开发指南,包括在STM32上使用ESP8266进行WiFi连接、发送和接收HTTP请求以及在ESP8266中使用AT指令发送GET方式请求等内容。ESP8266 硬件介绍 ESP8266系列模组有哪些:在本实验中,ESP8266与ESP-01不做区分。 ESP-01引脚介绍:引脚 功能3.3 3.3V供电,避…

一招学会Prometheus对接三方监控平台

文章来源:乐维社区 通过将Prometheus与不同的监控工具和服务集成,企业可以实现对更广泛资源和服务的监控,包括那些不由Prometheus原生支持的系统。这种集成不仅有助于获取更全面、深入的监控数据,还能提升故障排查和性能优化的效率,从而确保系统的稳定性和可靠性。 环境说…

Motoman机器人XRC控制柜维修

在现代工业生产中,YASKAWA机器人扮演着至关重要的角色。然而,如同所有的机械设备一样,YASKAWA机器人也会出现故障,尤其是其控制柜部分。因此,安川机器人维修工作对于保障机器人的正常运行意义非凡,这其中安川机器人控制柜维修更是关键环节。一、常见故障及解决方法1. 电源…

【域攻击】无文件落地攻击:msiexec

msiexec.exe属于系统进程,是Windows Installer的一部分,用于安装Windows Installer安装包(MSI),对系统的正常运行非常重要,一般在运行Microsoft Update安装更新或安装部分软件的时候出现,占用内存比较大,我们亦可以使用其作为无文件落地的媒介,下面举例说明: Step 1:使…

c# hosting 和 AppDomain

前言 简单介绍一下hosting 和 appdomain。 这两个东西。 正文 我们经常听说寄宿,这个寄宿是什么东西呢? 也就是我们这里要介绍的hosting。 什么是寄宿呢? 想一个问题,一个c# 的exe 程序,为啥能够启动呢?我们记得dll中是il代码。 那么肯定要加载运行时对吧。 那么为什么叫…

Dapr 简介 - 分布式应用运行时

Dapr 简介-分布式应用运行时 简介 Whats Dapr? Dapr = Distributed Application Runtime (分布式应用运行时)看到这个词,不知道大家想到了什么?是不是类似于 java runtime,c runtime 这类词汇。网上找到了一个 应用程序、运行时库和 OS 之间的关系图,其中 Runtime 库处于…

Mac脚本发布PHP开发项目到线上

环境 服务器: CentOS Linux release 7.6.1810 (Core) 客户端: macOS Sonoma 14.1.2 (23B92) 测试环境:开发使用ftp自动上传到/www/wwwroot/dirDev文件夹自动备份并发布程序 备份/www/wwwroot/dirOnline文件夹,并将开发项目/www/wwwroot/dirDev自动同步到线上环境/www/wwwro…

Mac发布PHP开发项目到线上项目

环境 服务器: CentOS Linux release 7.6.1810 (Core) 客户端: macOS Sonoma 14.1.2 (23B92) 开发使用ftp自动上传到/www/wwwroot/dirDev文件夹自动备份并发布程序 备份/www/wwwroot/dirOnline文件夹,并将开发项目/www/wwwroot/dirDev自动同步到线上环境/www/wwwroot/dirOnli…