CentOS安装maxwell

CentOs安装maxwell

    • 一、简介
    • 二、准备工作
    • 三、安装
      • 1、下载安装包
      • 2、解压
      • 3、编写配置文件
      • 4、启动maxwell
      • 5、验证
      • 6、停止maxwell
    • 四、说明
      • 1、更新数据
      • 2、插入数据
      • 3、删除数据
    • 五、遇到问题

一、简介

        maxwell是由美国Zendesk公司开源,它通过读取mysql的binlog日志,将数据变更以JSON的方式发送给Kafka, Kinesis等流数据处理平台。
        This is Maxwell’s daemon, a change data capture application that reads MySQL binlogs and writes data changes as JSON to Kafka, Kinesis, and other streaming platforms.

官网地址
源码地址

二、准备工作

本次实现mysql数据同步给kafka,所以mysql与kafka需要提前准备好。
mysql表结构如下:

CREATE TABLE `user_info` (`id` int NOT NULL AUTO_INCREMENT,`user_id` int NOT NULL,`username` varchar(255) NOT NULL,`email` varchar(255) NOT NULL,`phone_number` varchar(30) DEFAULT NULL,`status` enum('active','inactive') DEFAULT 'active',`score` int unsigned DEFAULT NULL,PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=35464335 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci

特别注意:需要启用mysql的binlog

vim /etc/my.cnf

增加如下内容:

#数据库id
server-id = 1
#该参数的值会作为binlog的文件名
log-bin=mysql-bin
#binlog类型,maxwell要求为row类型
binlog_format=row

三、安装

本次使用版本:V1.29.2
注:maxwell-1.30.0及以上版本不再支持JDK1.8

1、下载安装包

下载安装包V1.29.2

2、解压

tar -zxvf maxwell-1.29.2.tar.gz

3、编写配置文件

在解压目录下给了一个示例文件config.properties.example

cp config.properties.example config.properties

编辑内容参考如下:

# tl;dr config
log_level=info#maxwell同步数据的去向,支持stdout|file|kafka|kinesis|pubsub|sqs|rabbitmq|redis等
producer=kafka
kafka.bootstrap.servers=hadoop001:9092,hadoop002:9092,hadoop003:9092
#kafka的topic如下是动态配置
kafka_topic=%{database}_%{table}# mysql login info
host=***********
port=3306
user=root
password=************
jdbc_options=useSSL=false&serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=utf8

4、启动maxwell

进入解压目录执行如下命令:

./bin/maxwell --config ./config.properties --daemon

查看日志已启动成功:
在这里插入图片描述

5、验证

启动一个消费者,在数据库操作数据,然后观察kafka

kafka-console-consumer.sh --bootstrap-server hadoop001:9092 --topic hadoop_user_info

在这里插入图片描述

6、停止maxwell

ps -ef | grep maxwell | grep -v grep | grep maxwell | awk '{print $2}' | xargs kill -9

四、说明

maxwell同步输出格式如下:

1、更新数据

UPDATE `hadoop`.`user_info` SET `id` = '148254', `user_id` = '8321174' WHERE (`id` = '1482');
{"database": "hadoop","table": "user_info","type": "update","ts": 1705314481,"xid": 10903,"commit": true,"data": {"id": 148254,"user_id": 8321174,"username": "batesanthony","email": "justin62@example.com","phone_number": "+1-982-342-3093x988","status": "inactive","score": 99},"old": {"id": 1482,"user_id": 832117}
}

2、插入数据

INSERT INTO `hadoop`.`user_info` (`id`, `user_id`, `username`, `email`, `phone_number`, `status`, `score`) VALUES ('14832247', '57377145', 'joseph90', 'tbarnett@example.net', '295-683-4540x37958', 'active', '100');
{"database": "hadoop","table": "user_info","type": "insert","ts": 1705314503,"xid": 10966,"commit": true,"data": {"id": 14832247,"user_id": 57377145,"username": "joseph90","email": "tbarnett@example.net","phone_number": "295-683-4540x37958","status": "active","score": 100}
}

3、删除数据

DELETE FROM `hadoop`.`user_info` WHERE (`id` = '1483');
{"database": "hadoop","table": "user_info","type": "delete","ts": 1705314531,"xid": 11056,"commit": true,"data": {"id": 1483,"user_id": 573771,"username": "joseph90","email": "tbarnett@example.net","phone_number": "295-683-4540x37958","status": "active","score": 100}
}

JSON字段说明如下:

字段说明
database同步数据所属的数据库
table同步数据所属的数据库表
type数据变更的类型(insert、update、delete)
ts数据同步的时间戳
xid事务id
commit事务提交标志
data同步的具体数据属性与值
old在update类型中,表示变更的相关字段之前的值

五、遇到问题

  • java.lang.RuntimeException: error: unhandled character set ‘utf8mb3’
    在这里插入图片描述

解决该问题可修改源码,然后重新打包替换掉对应的jar即可。

详情可以参考这篇文章

在这里插入图片描述
在这里插入图片描述

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/410162.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Flink(十三)【Flink SQL(上)】

前言 最近在假期实训,但是实在水的不行,三天要学完SSM,实在一言难尽,浪费那时间干什么呢。SSM 之前学了一半,等后面忙完了,再去好好重学一遍,毕竟这玩意真是面试必会的东西。 今天开始学习 Flin…

Rust-Panic

什么是panic 在Rust中,有一类错误叫作panic。示例如下: 编译,没有错误,执行这段程序,输出为: 这种情况就引发了一个panic。在这段代码中,我们调用了Option::unwrap()方法,正是这个方…

50天精通Golang(第18天)

web开发介绍、iris框架安装、HTTP请求和返回、Iris路由处理 一 Web项目开发介绍及实战项目介绍 1.1 引言 本系列课程我们将学些Golang语言中的Web开发框架Iris的相关知识和用法。通过本系列视频课程,大家能够从零到一经历一个完整项目的开发,并在课程…

旧路由重置新路由设置新路由设置教程|适用于PPPoE拨号

前言 前几天朋友说路由器想要重置,但不知道怎么弄。所以就想着只帮忙重置路由器的话,只能帮到一个人。但把整个过程写成图文,就可以帮助更多人。 本文章适合电脑小白,请注意每一步哦! 注意事项 开始之前需要确认光猫…

1.6计算机网络的性能指标

1.6计算机网络的性能指标 常用的计算机网络的性能指标有7个:速率、带宽、吞吐量、时延、往返时间、利用率、丢包率 1.6.1速率 计算机发送的信号是以二进制数字形式的。一个二进制数字就是一个比特(bit,binary digit)字节:Byte,1Byte8bit(1…

Microsoft Excel 直方图

Microsoft Excel 直方图 1. 数据示例2. 打开 EXCEL3. settings4. 单击直方图柱,右键“添加数据标签”References 1. 数据示例 2. 打开 EXCEL 数据 -> 数据分析 -> 直方图 3. settings 输入区域样本值、接受区域分类间距,输出选项选择“新工作表组…

【css】渐变效果

css渐变效果 使用 CSS 渐变可以在两种颜色间制造出平滑的渐变效果。 用它代替图片,可以加快页面的载入时间、减小带宽占用。同时,因为渐变是由浏览器直接生成的,它在页面缩放时的效果比图片更好,因此你可以更加灵活、便捷的调整页…

ElasticSearch降本增效常见的方法 | 京东云技术团队

Elasticsearch在db_ranking 的排名不断上升,其在存储领域已经蔚然成风且占有非常重要的地位。 随着Elasticsearch越来越受欢迎,企业花费在ES建设上的成本自然也不少。那如何减少ES的成本呢?今天我们就特地来聊聊ES降本增效的常见方法&#x…

ilqr 算法说明

1 Introduction 希望能用比较简单的方式将ilqr算法进行整理和总结。 2 HJB方程 假定我们现在需要完成一个从A点到B点的任务,执行这段任务的时候,每一步都需要消耗能量,可以用下面这个图表示。 我们在执行这个A点到B点的任务的时候&#xf…

【记录】重装系统后的软件安装

考完研重装了系统,安装软件乱七八糟,用到什么装什么。在这里记录一套标准操作,备用。一个个装还是很麻烦,我为什么不直接写个脚本直接下载安装包呢?奥,原来是我太菜了还不会写脚本啊!先记着吧&a…

Vulnhub-tr0ll-1

一、信息收集 端口收集 PORT STATE SERVICE VERSION 21/tcp open ftp vsftpd 3.0.2 | ftp-anon: Anonymous FTP login allowed (FTP code 230) |_-rwxrwxrwx 1 1000 0 8068 Aug 09 2014 lol.pcap [NSE: writeable] | ftp-syst: | STAT: | FTP …

B端产品经理学习-版本规划管理

首先我们回顾一下用户故事,用户故事有如下特点: PRD文档的特点则如下: B端产品中用户角色不同,需求侧重也不同 决策人——公司战略需求:转型升级、降本增效、品牌提升等 管理负责人——公司管理需求:提升…