canal简介
官网:https://github.com/alibaba/canal
主要是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费,是一个实时同步的方案。
基于日志增量订阅和消费的业务包括
- 数据库镜像
- 数据库实时备份
- 索引构建和实时维护(拆分异构索引、倒排索引等)
- 业务 cache 刷新
- 带业务逻辑的增量数据处理
当前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x。
工作原理
canal相当于一个mysql slave节点,工作原理如下:
- canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送dump 协议
- MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )
- canal 解析 binary log 对象(原始为 byte 流)
canal各个组件
canal 特别设计了 client-server 模式,交互协议使用 protobuf 3.0 , client(消费端)支持多种语言:
- canal java 客户端: https://github.com/alibaba/canal/wiki/ClientExample
- canal c# 客户端: https://github.com/dotnetcore/CanalSharp
- canal go客户端: https://github.com/CanalClient/canal-go
- canal php客户端: https://github.com/xingwenge/canal-php
- canal Python客户端:https://github.com/haozi3156666/canal-python
- canal Rust客户端:https://github.com/laohanlinux/canal-rs
- canal Nodejs客户端:https://github.com/marmot-z/canal-nodejs
记住:canal是基于订阅和消费机制的,这边的client就是消费端,为了支持更多的语言或者防止消费能力不足,可以把消息直接投递到mq,借助mq的削峰平谷的能力。
- 最重要的组件是deployer,也就是server
- admin是一个webUI的动态管理组件,根据需要搭建
- example是client的一些例子
- adapter是一种客户端数据落地的适配,不如你想把数据同步到hbase,你可以直接用adapter,adapter会帮忙你做数据格式转换等,当然也可以通过客户端自己写。
实践
环境版本
- mysql版本:5.7.24 docker版
- canal版本:1.1.8-alpha-3
docker部署mysql
启动一个实例,如下
docker run -d --name mysql-test -e MYSQL_ROOT_PASSWORD=123456 -p 3306:3306 mysql:5.7.24
修改容器内/etc/mysql/mysql.conf.d/mysqld.cnf的内容,在末尾增加如下3行内容,可以通过docker cp来修改
[mysqld]
pid-file = /var/run/mysqld/mysqld.pid
socket = /var/run/mysqld/mysqld.sock
datadir = /var/lib/mysql
#log-error = /var/log/mysql/error.log
# By default we only accept connections from localhost
#bind-address = 127.0.0.1
# Disabling symbolic-links is recommended to prevent assorted security risks
symbolic-links=0
# 以下3行是新增的
server_id=1
log_bin=mysql-bin
binlog_format=ROW
重新启动容器
docker restart mysql-test
通过客户端(比如DBeaver连接mysql),查看这个配置参数
创建数据库test
创建表sys_user
CREATE TABLE `sys_user` (`user_id` int(11) NOT NULL AUTO_INCREMENT COMMENT '用户ID',`user_name` varchar(60) NOT NULL COMMENT '用户昵称',`email` varchar(50) DEFAULT '' COMMENT '用户邮箱',`sex` char(1) DEFAULT '0' COMMENT '用户性别(0男 1女 2未知)',`password` varchar(50) DEFAULT '' COMMENT '密码',`status` char(1) DEFAULT '0' COMMENT '帐号状态(0正常 1停用)',PRIMARY KEY (`user_id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COMMENT='用户信息表';
到此mysql的准备工作已结束,重点在于开启bin log,并设置format为ROW,因为canal是作为mysql slave存在的,需要通过binlog来同步数据。
部署canal-server
通过地址Release canal-1.1.8-alpha-3 · alibaba/canal · GitHub 下载1.1.8-alpha-3版本得到
canal.deployer-1.1.8-SNAPSHOT.tar.gz,解压缩得到如下的目录
打开conf/example/instance.properties配置源数据库(mysql)的信息,我这边偷懒直接用root账号,密码来自docker部署mysql启动时设置的密码
进入bin,直接点击startup.bat(windows环境)
然后打开logs/canal/canal.log,如果出现以下信息表示启动成功。接下来就可以通过客户端去订阅了。
例子1:canal-client的使用
在idea中新建一个maven项目,引入client的依赖
<!-- https://mvnrepository.com/artifact/com.alibaba.otter/canal.client --><dependency><groupId>com.alibaba.otter</groupId><artifactId>canal.client</artifactId><version>1.1.7</version></dependency><!-- https://mvnrepository.com/artifact/com.alibaba.otter/canal.protocol --><dependency><groupId>com.alibaba.otter</groupId><artifactId>canal.protocol</artifactId><version>1.1.7</version></dependency>
新增client类-CanalClient,代码如下:
package com.example.demo.canal;import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;import java.net.InetSocketAddress;
import java.util.HashMap;
import java.util.List;
import java.util.Map;public class CanalClient {public static void main(String[] args) throws InvalidProtocolBufferException {// 1. 创建链接,即server的地址和端口,其中example就是源数据库的名称(来自canal.deployer下的conf下的example文件及名称)CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("127.0.0.1",11111), "example", "", "");while (true) {// 2. 获取连接connector.connect();// 3. 指定订阅的数据库testconnector.subscribe("test.*");// 4. 获取MessageMessage message = connector.get(100);List<CanalEntry.Entry> entries = message.getEntries();if (entries.size() == 0) {System.out.println("没有数据,休息下");try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}} else {for (CanalEntry.Entry entry : entries) {// 获取表名String tableName = entry.getHeader().getTableName();// Entry类型CanalEntry.EntryType entryType = entry.getEntryType();if (CanalEntry.EntryType.ROWDATA.equals(entryType)) {// 获取序列化数据ByteString storeValue = entry.getStoreValue();// 反序列化CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(storeValue);// 获取事件类型CanalEntry.EventType eventType = rowChange.getEventType();// 获取具体数据List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList();// 遍历并打印数据for (CanalEntry.RowData rowData : rowDatasList) {List<CanalEntry.Column> beforeColumnsList = rowData.getBeforeColumnsList();Map<String, Object> bMap = new HashMap<>();for (CanalEntry.Column column : beforeColumnsList) {bMap.put(column.getName(), column.getValue());}List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList();Map<String, Object> aMap = new HashMap<>();for (CanalEntry.Column column : afterColumnsList) {aMap.put(column.getName(), column.getValue());}System.out.println("表名:" + tableName + ",操作类型:" + eventType);System.out.println(",变化前:" + bMap);System.out.println(",变化后:" + aMap);}}}}}}
}
启动后,控制台打印如下:
当往数据库插入一条记录
insert sys_user(user_name, email, sex, `password`, status) values('张三','zhangsan@qq.com', 0, '123',0);
结果,控制台打印如下:
当修改记录时,控制台打印如下
update sys_user set `password`='123456' where user_name='张三';
当删除记录时,控制台打印如下
delete from sys_user where user_name='张三';
至此获取到的数据的变化,你可以把他手动转换、处理等等操作,也可以同步到其他的数据库,灵活性很大,但是如果当只是想把数据同步到其他数据库时,可以直接选择使用client-adapter,不需要编码,client-adapter可以认为是针对特定场景的加强版的客户端。
例子2:springboot集成canal-client
这边采用非官方封装的jar包,但是使用很方便,首先新建一个springboot项目(略)
引入以下maven依赖
<dependency><groupId>top.javatool</groupId><artifactId>canal-spring-boot-starter</artifactId><version>1.2.6-RELEASE</version>
</dependency>
使用很简单,比如同步数据到redis,详细参考官网:GitHub - NormanGyllenhaal/canal-client: spring boot canal starter 易用的canal 客户端 canal client
例子3:同步数据到clickhouse(client-adapter的使用)
clickhouse-adapter是1.18新增的,不知是我的配置还有问题,当sys_user没有id字段时,只有新增能正常同步到clickhouse,故把sys_user.user_id改为id。
先部署clickhouse,在default新建表,建表语句如下:
CREATE TABLE default.test_user
(`id` Int64,`user_name` String,`email` String,`sex` String,`password` String,`status` String
)
ENGINE = MergeTree
PRIMARY KEY (id) -- 明确指定 PRIMARY KEY
ORDER BY (id) -- ORDER BY 是必须的
SETTINGS index_granularity = 8192;
通过地址Release canal-1.1.8-alpha-3 · alibaba/canal · GitHub 下载1.1.8-alpha-3版本得到
canal.adapter-1.1.8-SNAPSHOT.tar.gz
解压缩得到如下的目录
在conf下新增clickhouse文件夹,并且新建test_user.yml文件,内容如下:
#dataSourceKey: defaultDS
destination: example
groupId: g1
outerAdapterKey: clickhouse1
concurrent: true
dbMapping:database: testtable: sys_usertargetTable: default.test_usertargetPk:id: id
# mapAll: truetargetColumns:id:user_name:email:sex:password:status: #etlCondition: "where id>={}"commitBatch: 3000 # 批量提交的大小
修改conf下的application.yml,把outerAdapters: 下的配置改为如下:主要是配置下目标clickhouse数据库的信息。
启动bin下的startup.bat(windows系统),当看到如下信息表示启动成功
此时去mysql中新增几条记录,adapter的控制台会打印出对应的信息:
查看clickhouse,会发现记录已同步,删除和修改类似的。
部署canal-admin
新建数据库canal_manager,用于admin,这边偷个懒,还是用root账号
通过地址Release canal-1.1.8-alpha-3 · alibaba/canal · GitHub 下载1.1.8-alpha-3版本得到
canal.admin-1.1.8-SNAPSHOT.tar.gz
解压缩得到如下的目录
把conf下的canal_manager.sql sql导入数据库中,进行数据初始化。
修改conf/application.yml的配置,如下:
server:port: 8089
spring:jackson:date-format: yyyy-MM-dd HH:mm:sstime-zone: GMT+8spring.datasource:address: 127.0.0.1:3306database: canal_managerusername: rootpassword: 123456driver-class-name: com.mysql.jdbc.Driverurl: jdbc:mysql://127.0.0.1:3306/canal_manager?useUnicode=true&characterEncoding=UTF-8&useSSL=falsehikari:maximum-pool-size: 30minimum-idle: 1canal:adminUser: adminadminPasswd: admin
启动bin/startup.bat
此时代表canal-admin已经启动成功,可以通过 http://127.0.0.1:8089/ 访问,默认密码:admin/123456
关闭admin,接下去要把canal-server对接到admin上,
修改canal-server的conf下的canal_local.properties,内容如下:
# register ip
canal.register.ip =# canal admin config
canal.admin.manager = 127.0.0.1:8089
canal.admin.port = 11110
canal.admin.user = admin
canal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441
# admin auto register
canal.admin.register.auto = true
canal.admin.register.cluster =
canal.admin.register.name =
然后在canal-deploy目录下以local的配置启动canal-server
$ bin/startup.bat local
重启canal-admin
进入网页端,能看到如下信息,server管理已经存在一个新的节点了
使用心得
canal定位是一个增量日志同步,但是也有方式可以设置全量同步,但都需要canal-adapter支持
- canal可以设置全库同步,详见参考:Sync RDB · alibaba/canal Wiki · GitHub
- 对于表级别的全量同步,可以使用etl接口,详见:ClientAdapter · alibaba/canal Wiki · GitHub
- 配合datax使用,datax先全量同步,再使用canal增量同步。
题外话
canal作为一个国内使用非常广的实时同步项目,经过一系列的入门调研到现在想放弃了,因为这个项目的维护不行了,一年只发一个版本,作为竞品debezium和flinkcdc不管是版本发布还是issue的回复都很火热,关键是发现了alpha版的etl bug存在大半年了,还没修复,所以想弃坑,转向flink cdc了。