Maxwell简介&使用
Maxwell 介绍
Maxwell 是由美国
zendesk 开源,用 java 编写的 Mysql 实时抓取软件,
其抓取的 原理也是基于 binlog。
官网
Maxwell 和 canal 工具对比
➢
Maxwell 没有 canal 那种 server+client 模式,只有一个 server 把数据发送到消息队列 或 redis。如果需要多个实例,通过指定不同配置文件启动多个进程。
➢
Maxwell 有一个亮点功能,就是 canal 只能抓取最新数据,对已存在的历史数据没有办
法处理。而 Maxwell 有一个 bootstrap 功能,可以直接引导出完整的历史数据用于初
始化,非常好用。
➢
Maxwell 不能直接支持 HA,但是它支持断点还原,即错误解决后重启继续上次点儿读
取数据,canal也支持。
➢
Maxwell 只支持 json 格式,而 Canal 如果用 Server+client 模式的话,可以自定义格
式。
➢
Maxwell 比 Canal 更加轻量级。
安装使用
前提开启了mysql的binlog在cannel使用的时候已经讲解
前提
创建保存断点续传的数据库,并且创建maxwell用户密码为123456,对于shishimaxwell数据库给maxwell授予权限
mysql -uroot -p123456
CREATE DATABASE shishimaxwell ;
#maxwell是用戶名
GRANT ALL ON shishimaxwell.* TO 'maxwell'@'%' IDENTIFIED BY '123456';
#maxwell是用戶名
GRANT SELECT ,REPLICATION SLAVE , REPLICATION CLIENT ON *.* TO maxwell@'%';
tar -zxvf maxwell-1.25.0.tar.gz
修改配置
cp config.properties.example
vi config.properties
producer=kafka
kafka.bootstrap.servers=master:9092,node1:9092,node2:9092
#需要添加
kafka_topic=gmall_db
# mysql login info,这里的mysql既是开启binlog的mysql,也是自己创建保存读取位置shishimaxwell的mysql
#数据库host
host=master
#数据库用户
user=maxwell
#数据库密码
password=123456
#需要添加 后续bootstrap初始化会用
client_id=maxwell_1
#指定消费位置保存的数据库
schema_database=shishimaxwell
注意:默认还是输出到指定 Kafka 主题的一个 kafka 分区,因为多个分区并行可能会打乱
binlog 的顺序
如果要提高并行度,首先设置 kafka 的分区数>1,然后设置 producer_partition_by 属性
可选值 producer_partition_by=database|table|primary_key|random| column
修改表结构注意
如果是修改了表的结构,修改以后打印的数据为
ADD COLUMN `xinde` varchar(255) NULL COMMENT '修改时间' AFTER `operate_time`" to gmall, new schema id is 2tbeat=0] after applying "ALTER TABLE `gmall`.`comment_info`
但是如果没有配置schema_host那么修改表结构的数据是不会发送到kafka的,也就是说普通模式只会吧增删改查的数据发送到kafka

启动
/home/bigdata/shishishell/maxwell/maxwell-1.25.0/bin/maxwell --config /home/bigdata/shishishell/maxwell/maxwell-1.25.0/config.properties >/dev/null 2>&1 &
查看数据库是否生成了表

数据产生的格式
增
{
"database": "gmall",
"table": "comment_info",
"type": "insert",
"ts": 1657694285,
"xid": 79219,
"commit": true,
"data": {
"id": 1547108081146556427,
"user_id": 682,
"nick_name": null,
"head_img": null,
"sku_id": 24,
"spu_id": 8,
"order_id": 6897,
"appraise": "1204",
"comment_txt": "评论内容:82154639985516955573787189871861313164594735727667",
"create_time": "2022-06-28 14:38:04",
"operate_time": null
}
}
删
{
"database": "gmall",
"table": "base_dic",
"type": "delete",
"ts": 1657694385,
"xid": 111301,
"commit": true,
"data": {
"dic_code": "1101",
"dic_name": "支付宝",
"parent_code": "11",
"create_time": null,
"operate_time": null
}
}
改
{
"database": "gmall",
"table": "base_dic",
"type": "update",
"ts": 1657694417,
"xid": 111341,
"commit": true,
"data": {
"dic_code": "1103",
"dic_name": "银联1",
"parent_code": "11",
"create_time": null,
"operate_time": null
},
"old": {
"dic_name": "银联"
}
}
bootstap全量
--client_id maxwell_1就是上面配置的东西,它的作用是bootstap只是查询出了数据,而--client_id maxwell_1是指点配置文件里面使用maxwell的一个客户端同步数据
bin/maxwell-bootstrap --user maxwell --password 123456 --host master --database gmall --table user_info --client_id maxwell_1
生成的数据如下
开始的一条数据为空
{
"database": "gmall",
"table": "user_info",
"type": "bootstrap-start",
"ts": 1657718813,
"data": {}
}
{
"database": "gmall",
"table": "user_info",
"type": "bootstrap-insert",
"ts": 1657718505,
"data": {
"id": 6800,
"login_name": "m9ml9e7xq4bx",
"nick_name": "寒寒",
"passwd": null,
"name": "元寒",
"phone_num": "13132628314",
"email": "m9ml9e7xq4bx@yeah.net",
"head_img": null,
"user_level": "1",
"birthday": "1998-05-28",
"gender": "F",
"create_time": "2022-06-29 04:49:45",
"operate_time": null,
"status": null
}
}
总结数据特点
➢
日志结构
canal
每一条 SQL 会产生一条日志,如果该条 Sql 影响了多行数据,则已经会通过集
合的方式归集在这条日志中
。(即使是一条数据也会是数组结构)
maxwell 以影响的数据为单位产生日志,即每影响一条数据就会产生一条日志。如果
想知道这些日志是否是通过某一条 sql 产生的可以通过 xid 进行判断,相同的 xid 的日志来
自同一 sql。
➢
数字类型
当原始数据是数字类型时,
maxwell 会尊重原始数据的类型不增加双引
,变为字符串。
canal 一律转换为字符串。
➢
带原始数据字段定义
canal 数据中会带入表结构
。maxwell 更简洁。
配置文件模板
log_level=info
#producer=stdout
producer=kafka
kafka.compression.type=snappy
kafka.batch.size=10000
kafka.request.timeout.ms = 360000
kafka.retries=3
kafka.acks=-1
kafka.bootstrap.servers=ip:9092,ip:9092,ip:9092
kafka_topic=%{database}_%{table}
#output_ddl=true
producer_partition_by=primary_key
kafka_partition_hash=murmur3
# mysql login info
host=localhost
port=3306
user=maxwell
password=密码
replication_host=mysql_host
replication_user=username
replication_password=password
replication_port=3306
filter=exclude: *.*, include: 数据库.表, include: 数据库.表, include: 数据库.表
jdbc_options=serverTimeZone=Asia/Shanghai&characterEncoding=utf-8&useSSL=false
client_id=maxwell_1
replica_server_id=1
下面文章还行