DataX(MySQL同步数据到Doris)

1.场景

这里演示介绍的使用 Doris 的 Datax 扩展 DorisWriter实现从Mysql数据定时抽取数据导入到Doris数仓表里

2.编译 DorisWriter

这个的扩展的编译可以不在 doris 的 docker 编译环境下进行,本文是在 windows 下的 WLS 下进行编译的

首先从github上拉取源码

 git clone https://github.com/apache/incubator-doris.git

进入到incubator-doris/extension/DataX/ 执行编译

首先执行:

sh init_env.sh
这个脚本主要用于构建 DataX 开发环境,他主要进行了以下操作:

  1. 将 DataX 代码库 clone 到本地。
  2. 将 doriswriter/ 目录软链到 DataX/doriswriter 目录。
  3. 在 DataX/pom.xml 文件中添加 <module>doriswriter</module> 模块。
  4. 将 DataX/core/pom.xml 文件中的 httpclient 版本从 4.5 改为 4.5.13 httpclient v4.5 在处理 307 转发时有bug。
  5. 这个脚本执行后,开发者就可以进入 DataX/ 目录开始开发或编译了。因为做了软链,所以任何对 DataX/doriswriter 目录中文件的修改,都会反映到 doriswriter/ 目录中,方便开发者提交代码

2.1 开始编译
这里我为了加快编译速度去掉了很多无用的插件:这里直接在Datax目录下的pom.xml里注释掉就行

 hbase11xreaderhbase094xreadertsdbreaderoceanbasev10readerodpswriterhdfswriteradswriterocswriteroscarwriteroceanbasev10writer

然后进入到incubator-doris/extension/DataX/ 目录下的 Datax 目录,执行编译

这里我是执行的将 Datax 编译成 tar 包,和官方的编译命令不太一样。

 mvn -U clean package assembly:assembly -Dmaven.test.skip=true

image.png

编译完成以后,tar 包在 Datax/target 目录下,你可以将这tar包拷贝到你需要的地方,这里我是直接在 datax 执行测试,这里因为的 python 版本是 3.x版本,需要将 bin 目录下的三个文件换成 python 3能之别的版本,这个你可以去下面的地址下载:

https://github.com/WeiYe-Jing...
将下载的三个文件替换 bin 目录下的文件以后,整个编译,安装就完成了

如果你编译不成功也可以从我的百度网盘上下载编译好的包,注意我上边编译去掉的那些插件

链接: https://pan.baidu.com/s/1ObQ4Md0A_0ut4O6-_gPSQg

提取码: 424s 

3.数据接入

这个时候我们就可以开始使用 Datax 的doriswriter扩展开始从 Mysql(或者其他数据源)直接将数据抽取出来导入到 Doris 表中了。

3.1 Mysql 数据库准备

下面是我数据库的建表脚本(mysql 8):

CREATE TABLE `order_analysis` (`date` varchar(19) DEFAULT NULL,`user_src` varchar(9) DEFAULT NULL,`order_src` varchar(11) DEFAULT NULL,`order_location` varchar(2) DEFAULT NULL,`new_order` int DEFAULT NULL,`payed_order` int DEFAULT NULL,`pending_order` int DEFAULT NULL,`cancel_order` int DEFAULT NULL,`reject_order` int DEFAULT NULL,`good_order` int DEFAULT NULL,`report_order` int DEFAULT NULL) ENGINE=InnoDB DEFAULT CHARSET=utf8 ROW_FORMAT=COMPACT

示例数据:

INSERT INTO `sql12298540`.`order_analysis` (`date`, `user_src`, `order_src`, `order_location`, `new_order`, `payed_order`, `pending_order`, `cancel_order`, `reject_order`, `good_order`, `report_order`) VALUES ('2015-10-12 00:00:00', '广告二维码', 'Android APP', '上海', 15253, 13210, 684, 1247, 1000, 10824, 862);INSERT INTO `sql12298540`.`order_analysis` (`date`, `user_src`, `order_src`, `order_location`, `new_order`, `payed_order`, `pending_order`, `cancel_order`, `reject_order`, `good_order`, `report_order`) VALUES ('2015-10-14 00:00:00', '微信朋友圈H5页面', 'iOS APP', '广州', 17134, 11270, 549, 204, 224, 10234, 773);INSERT INTO `sql12298540`.`order_analysis` (`date`, `user_src`, `order_src`, `order_location`, `new_order`, `payed_order`, `pending_order`, `cancel_order`, `reject_order`, `good_order`, `report_order`) VALUES ('2015-10-17 00:00:00', '地推二维码扫描', 'iOS APP', '北京', 16061, 9418, 1220, 1247, 458, 13877, 749);INSERT INTO `sql12298540`.`order_analysis` (`date`, `user_src`, `order_src`, `order_location`, `new_order`, `payed_order`, `pending_order`, `cancel_order`, `reject_order`, `good_order`, `report_order`) VALUES ('2015-10-17 00:00:00', '微信朋友圈H5页面', '微信公众号', '武汉', 12749, 11127, 1773, 6, 5, 9874, 678);INSERT INTO `sql12298540`.`order_analysis` (`date`, `user_src`, `order_src`, `order_location`, `new_order`, `payed_order`, `pending_order`, `cancel_order`, `reject_order`, `good_order`, `report_order`) VALUES ('2015-10-18 00:00:00', '地推二维码扫描', 'iOS APP', '上海', 13086, 15882, 1727, 1764, 1429, 12501, 625);INSERT INTO `sql12298540`.`order_analysis` (`date`, `user_src`, `order_src`, `order_location`, `new_order`, `payed_order`, `pending_order`, `cancel_order`, `reject_order`, `good_order`, `report_order`) VALUES ('2015-10-18 00:00:00', '微信朋友圈H5页面', 'iOS APP', '武汉', 15129, 15598, 1204, 1295, 1831, 11500, 320);INSERT INTO `sql12298540`.`order_analysis` (`date`, `user_src`, `order_src`, `order_location`, `new_order`, `payed_order`, `pending_order`, `cancel_order`, `reject_order`, `good_order`, `report_order`) VALUES ('2015-10-19 00:00:00', '地推二维码扫描', 'Android APP', '杭州', 20687, 18526, 1398, 550, 213, 12911, 185);INSERT INTO `sql12298540`.`order_analysis` (`date`, `user_src`, `order_src`, `order_location`, `new_order`, `payed_order`, `pending_order`, `cancel_order`, `reject_order`, `good_order`, `report_order`) VALUES ('2015-10-19 00:00:00', '应用商店', '微信公众号', '武汉', 12388, 11422, 702, 106, 158, 5820, 474);INSERT INTO `sql12298540`.`order_analysis` (`date`, `user_src`, `order_src`, `order_location`, `new_order`, `payed_order`, `pending_order`, `cancel_order`, `reject_order`, `good_order`, `report_order`) VALUES ('2015-10-20 00:00:00', '微信朋友圈H5页面', '微信公众号', '上海', 14298, 11682, 1880, 582, 154, 7348, 354);INSERT INTO `sql12298540`.`order_analysis` (`date`, `user_src`, `order_src`, `order_location`, `new_order`, `payed_order`, `pending_order`, `cancel_order`, `reject_order`, `good_order`, `report_order`) VALUES ('2015-10-21 00:00:00', '地推二维码扫描', 'Android APP', '深圳', 22079, 14333, 5565, 1742, 439, 8246, 211);INSERT INTO `sql12298540`.`order_analysis` (`date`, `user_src`, `order_src`, `order_location`, `new_order`, `payed_order`, `pending_order`, `cancel_order`, `reject_order`, `good_order`, `report_order`) VALUES ('2015-10-22 00:00:00', 'UC浏览器引流', 'iOS APP', '上海', 28968, 18151, 7212, 2373, 1232, 10739, 578);

3.2 doris数据库准备

下面是我上面数据表在doris对应的建表脚本

CREATE TABLE `order_analysis` (`date` datetime DEFAULT NULL,`user_src` varchar(30) DEFAULT NULL,`order_src` varchar(50) DEFAULT NULL,`order_location` varchar(10) DEFAULT NULL,`new_order` int DEFAULT NULL,`payed_order` int DEFAULT NULL,`pending_order` int DEFAULT NULL,`cancel_order` int DEFAULT NULL,`reject_order` int DEFAULT NULL,`good_order` int DEFAULT NULL,`report_order` int DEFAULT NULL) ENGINE=OLAPDUPLICATE KEY(`date`,user_src)COMMENT "OLAP"DISTRIBUTED BY HASH(`user_src`) BUCKETS 1PROPERTIES ("replication_num" = "3","in_memory" = "false","storage_format" = "V2");

3.3 Datax Job JSON文件

创建并编辑datax job任务json文件,并保存到指定目录

 {"job": {"setting": {"speed": {"channel": 1},"errorLimit": {"record": 0,"percentage": 0}},"content": [{"reader": {"name": "mysqlreader","parameter": {"username": "root","password": "zh","column": ["date","user_src","order_src","order_location","new_order","payed_order"," pending_order"," cancel_order"," reject_order"," good_order"," report_order" ],"connection": [ { "table": [ "order_analysis" ], "jdbcUrl": [ "jdbc:mysql://localhost:3306/demo" ] } ] }},"writer": {"name": "doriswriter","parameter": {"feLoadUrl": ["fe:8030"],"beLoadUrl": ["be1:8040","be1:8040","be1:8040","be1:8040","be1:8040","be1:8040"],"jdbcUrl": "jdbc:mysql://fe:9030/","database": "test_2","table": "order_analysis","column": ["date","user_src","order_src","order_location","new_order","payed_order"," pending_order"," cancel_order"," reject_order"," good_order"," report_order"],"username": "root","password": "","postSql": [],"preSql": [],"loadProps": {},"maxBatchRows" : 10000,"maxBatchByteSize" : 104857600,"labelPrefix": "datax_doris_writer_demo_","lineDelimiter": "\n"}}}]}}

这块 Mysql reader 使用方式参照:

https://github.com/alibaba/DataX/blob/master/mysqlreader/doc/mysqlreader.md

doriswriter的使用及参数说明:

 https://github.com/apache/incubator-doris/blob/master/extension/DataX/doriswriter/doc/doriswriter.md

或者

{
    "job": {
        "setting": {
            "speed": {
                "channel": 1
            },
            "errorLimit": {
                "record": 0,
                "percentage": 0
            }
        },
        "content": [
            {
                "reader": {
                    "name": "mysqlreader",
                    "parameter": {
                        "username": "root",
                        "password": "My",
                        "column": ["id","md5","eid","industry_code","start_date","end_date","is_valid","source","create_time ","update_time","row_update_time","local_row_update_time"],
                        "connection": [ { "table": [ "t_last_industry_all" ], "jdbcUrl": [ "jdbc:mysql://IP:3306/log" ] } ] }
                },
                "writer": {
                    "name": "doriswriter",
                    "parameter": {
                        "feLoadUrl": ["IP:8030"],
                        "beLoadUrl": ["IP:8040"],
                        "jdbcUrl": "jdbc:mysql://IP:9030/",
                        "database": "mysqltodoris",
                        "table": "t_last",
                        "column": ["id","md5","eid","industry_code","start_date","end_date","is_valid","source","create_time ","update_time","row_update_time","local_row_update_time"],
                        "username": "root",
                        "password": "123456",
                        "postSql": [],
                        "preSql": [],
                        "loadProps": {
                        },
                        "maxBatchRows" : 300000,
                        "maxBatchByteSize" : 20971520
                    }
                }
            }
        ]
    }
}

4.执行Datax数据导入任务

python bin/datax.py doris.json
然后就可以看到执行结果:

再去 Doris 数据库中查看你的表,数据就已经导入进去了,任务执行结束

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/105827.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

2023.9.8 基于传输层协议 UDP 和 TCP 编写网络通信程序

目录 UDP 基于 UDP 编写网络通信程序 服务器代码 客户端代码 TCP 基于 TCP 编写网络通信程序 服务器代码 客户端代码 IDEA 打开 支持多客户端模式 UDP 特点&#xff1a; 无连接性&#xff1a;发送端和接收端不需要建立连接也可相互通信&#xff0c;且每个 UDP 数据包都…

qt作业day5

//客户端&#xff0c;#include "tcpcli.h" #include "ui_tcpcli.h"TcpCli::TcpCli(QWidget *parent) :QWidget(parent),ui(new Ui::TcpCli) {ui->setupUi(this);//给客户端指针实例化对象cli_sock new QTcpSocket(this);ui->discntBtn->setEnabl…

HCIA自学笔记01-冲突域

共享式网络&#xff08;用同一根同轴电缆通信&#xff09;中可能会出现信号冲突现象。 如图是一个10BASE5以太网&#xff0c;每个主机都是用同一根同轴电缆来与其它主机进行通信&#xff0c;因此&#xff0c;这里的同轴电缆又被称为共享介质&#xff0c;相应的网络被称为共享介…

Prometheus+Grafana可视化监控【主机状态】

文章目录 一、介绍二、安装Prometheus三、安装Grafana四、Pronetheus和Grafana相关联五、监控服务器状态六、常见问题 一、介绍 Prometheus是一个开源的系统监控和报警系统&#xff0c;现在已经加入到CNCF基金会&#xff0c;成为继k8s之后第二个在CNCF托管的项目&#xff0c;在…

【pytest】tep环境变量、fixtures、用例三者之间的关系

tep是一款测试工具&#xff0c;在pytest测试框架基础上集成了第三方包&#xff0c;提供项目脚手架&#xff0c;帮助以写Python代码方式&#xff0c;快速实现自动化项目落地。 在tep项目中&#xff0c;自动化测试用例都是放到tests目录下的&#xff0c;每个.py文件相互独立&…

springboot项目配置flyway菜鸟级别教程

1、Flyway的工作原理 Flyway在第一次执行时&#xff0c;会创建一个默认名为flyway_schema_history的历史记录表&#xff0c;这张表会用来跟踪或记录数据库的状态&#xff0c;然后每次项目启动时都会自动扫描在resources/db/migration下的文件的版本号并且通过查询flyway_schem…

九安监控初始化后恢复案例

九安监控是国内一个十六线小安防品牌&#xff0c;目前CHS零壹视频恢复程序监控版、专业版、高级版是支持这个安防品牌的&#xff0c;不过下边这个案例比较特殊&#xff0c;具体情况如下。 故障存储:希捷4T监控专用硬盘 故障现象: 客户描述是使用了初始化操作&#xff0c;正常…

LeetCode //C - 114. Flatten Binary Tree to Linked List

114. Flatten Binary Tree to Linked List Given the root of a binary tree, flatten the tree into a “linked list”: The “linked list” should use the same TreeNode class where the right child pointer points to the next node in the list and the left child …

游戏发行平台都有什么服务和功能?

游戏发行平台通常提供一系列服务和功能&#xff0c;以帮助游戏开发商将游戏推向市场&#xff0c;并为玩家提供游戏。以下是一些常见的游戏发行平台服务和功能&#xff1a; 1、游戏发布 发行平台允许游戏开发商将游戏上传到平台上&#xff0c;以供玩家下载和安装。 2、游戏销售…

Redis7--基础篇1(概述,安装、卸载及配置)

1. Redis概述 1.1 什么是Redis Redis&#xff1a;REmote Dictionary Server&#xff08;远程字典服务器&#xff09; Remote Dictionary Server(远程字典服务)是完全开源的&#xff0c;使用ANSIC语言编写遵守BSD协议&#xff0c;是一个高性能的Key-Value数据库提供了丰富的数…

Python 操作 CSV

使用过 CSV 文件都知道&#xff1a;如果我们的电脑中装了 WPS 或 Microsoft Office 的话&#xff0c;.csv 文件默认是被 Excel 打开的&#xff0c;那么什么是 CSV 文件&#xff1f;CSV 文件与 Excel 文件有什么区别&#xff1f;如何通过 Python 来操作 CSV 文件呢&#xff1f;带…

mfc 浮动窗口

参考 MFC模拟360悬浮窗加速球窗口