Flink CDC系列之:调研应用Flink CDC将 ELT 从 MySQL 流式传输到 Doris方案
- 准备
- 准备 Flink Standalone 集群
- 准备 docker compose
- 为 MySQL 准备记录
- 在 Doris 中创建数据库
- 使用 Flink CDC CLI 提交作业
- 同步架构和数据更改
- 路由变更
- 清理
本教程将展示如何使用 Flink CDC 快速构建从 MySQL 到 Doris 的 Streaming ELT 作业,包括同步一个数据库的所有表、模式变更演变和将分片表同步到一张表的功能。
本教程中的所有练习都在 Flink CDC CLI 中执行,整个过程使用标准 SQL 语法,无需一行 Java/Scala 代码或 IDE 安装。
准备
准备一台安装了 Docker 的 Linux 或 MacOS 电脑。
准备 Flink Standalone 集群
下载 Flink 1.18.0 ,解压得到 flink-1.18.0 目录。
使用以下命令进入 Flink 目录,并将 FLINK_HOME 设置为 flink-1.18.0 所在的目录。
cd flink-1.18.0
通过将以下参数附加到 conf/flink-conf.yaml 配置文件来启用检查点,每 3 秒执行一次检查点。
execution.checkpointing.interval: 3000
使用以下命令启动 Flink 集群。
./bin/start-cluster.sh
如果启动成功,你就可以通过http://localhost:8081/访问Flink Web UI,如下所示。
多次执行start-cluster.sh可以启动多个TaskManager。
准备 docker compose
以下教程将使用docker-compose准备所需的组件。
主机配置
由于Doris需要内存映射支持才能运行,因此在主机上执行以下命令:
sysctl -w vm.max_map_count=2000000
由于 MacOS 内部实现容器的方式不同,部署时可能无法直接修改主机上的 max_map_count 值,需要先创建以下容器:
docker run -it --privileged --pid=host --name=change_count debian nsenter -t 1 -m -u -n -i sh
执行以下命令已成功创建容器:
sysctl -w vm.max_map_count=2000000
然后exit退出,并创建Doris Docker集群。
启动 docker compose 使用下面提供的内容创建一个 docker-compose.yml 文件:
version: '2.1'
services:doris:image: yagagagaga/doris-standaloneports:- "8030:8030"- "8040:8040"- "9030:9030"mysql:image: debezium/example-mysql:1.1ports:- "3306:3306"environment:- MYSQL_ROOT_PASSWORD=123456- MYSQL_USER=mysqluser- MYSQL_PASSWORD=mysqlpw
Docker Compose 应包含以下服务(容器):
MySQL:包含一个名为 app_db 的数据库
Doris:用于存储来自 MySQL 的表
docker-compose up -d
要启动所有容器,请在包含 docker-compose.yml 文件的目录中运行以下命令。
该命令会自动以分离模式启动 Docker Compose 配置中定义的所有容器。运行 docker ps 可以检查这些容器是否正常运行。也可以访问 http://localhost:8030/ 查看 Doris 是否正在运行。
为 MySQL 准备记录
进入 MySQL 容器
docker-compose exec mysql mysql -uroot -p123456
创建 app_db 数据库和订单、产品、发货表,然后插入记录
-- create database
CREATE DATABASE app_db;USE app_db;-- create orders table
CREATE TABLE `orders` (
`id` INT NOT NULL,
`price` DECIMAL(10,2) NOT NULL,
PRIMARY KEY (`id`)
);-- insert records
INSERT INTO `orders` (`id`, `price`) VALUES (1, 4.00);
INSERT INTO `orders` (`id`, `price`) VALUES (2, 100.00);-- create shipments table
CREATE TABLE `shipments` (
`id` INT NOT NULL,
`city` VARCHAR(255) NOT NULL,
PRIMARY KEY (`id`)
);-- insert records
INSERT INTO `shipments` (`id`, `city`) VALUES (1, 'beijing');
INSERT INTO `shipments` (`id`, `city`) VALUES (2, 'xian');-- create products table
CREATE TABLE `products` (
`id` INT NOT NULL,
`product` VARCHAR(255) NOT NULL,
PRIMARY KEY (`id`)
);-- insert records
INSERT INTO `products` (`id`, `product`) VALUES (1, 'Beer');
INSERT INTO `products` (`id`, `product`) VALUES (2, 'Cap');
INSERT INTO `products` (`id`, `product`) VALUES (3, 'Peanut');
在 Doris 中创建数据库
Doris Connector 目前不支持自动创建数据库,需要先创建写入表对应的数据库。
- 进入 Doris Web UI。http://localhost:8030/
- 默认用户名是 root,默认密码为空。
通过 Web UI 创建 app_db 数据库。
create database app_db;
使用 Flink CDC CLI 提交作业
- 下载下面列出的二进制压缩包并解压到目录 flink cdc-3.1.0’:
flink-cdc-3.1.0-bin.tar.gz flink-cdc-3.1.0 目录下会包含四个目录:bin、lib、log、conf。 - 下载下面列出的连接器包并移动到 lib 目录
下载链接只针对稳定版本,SNAPSHOT 依赖需要自行基于 master 或 release 分支构建。请注意,需要将 jar 移动到 Flink CDC Home 的 lib 目录,而不是 Flink Home 的 lib 目录。- MySQL 管道连接器 3.1.0
- Apache Doris 管道连接器 3.1.0
您还需要将 MySQL 连接器放入 Flink lib 文件夹或使用 --jar 参数传递它,因为它们不再与 CDC 连接器一起打包:
- MySQL Connector Java
编写任务配置yaml文件下面是同步整个数据库的示例文件mysql-to-doris.yaml:
################################################################################
# Description: Sync MySQL all tables to Doris
################################################################################
source:type: mysqlhostname: localhostport: 3306username: rootpassword: 123456tables: app_db.\.*server-id: 5400-5404server-time-zone: UTCsink:type: dorisfenodes: 127.0.0.1:8030username: rootpassword: ""table.create.properties.light_schema_change: truetable.create.properties.replication_num: 1pipeline:name: Sync MySQL Database to Dorisparallelism: 2
注意:
source中的tables: app_db..*通过正则匹配同步app_db中的所有表。
sink中的table.create.properties.replication_num是因为Docker镜像中只有一个Doris BE节点。
最后,使用Cli将作业提交到Flink Standalone集群。
bash bin/flink-cdc.sh mysql-to-doris.yaml
提交成功后返回信息如下:
Pipeline has been submitted to cluster.
Job ID: ae30f4580f1918bebf16752d4963dc54
Job Description: Sync MySQL Database to Doris
我们可以通过 Flink Web UI 找到一个名为“Sync MySQL Database to Doris”的作业正在运行。
我们可以发现表是通过 Doris Web UI 创建和插入的。
同步架构和数据更改
进入MySQL容器
docker-compose exec mysql mysql -uroot -p123456
然后修改MySQL中的schema和记录,Doris的表也会实时改变:
在MySQL中的orders中插入一条记录:
INSERT INTO app_db.orders (id, price) VALUES (3, 100.00);
在 MySQL 的订单中添加一列:
ALTER TABLE app_db.orders ADD amount varchar(100) NULL;
从 MySQL 更新订单中的一条记录:
UPDATE app_db.orders SET price=100.00, amount=100.00 WHERE id=1;
从 MySQL 中删除订单中的一条记录:
DELETE FROM app_db.orders WHERE id=2;
每执行一步刷新一下 Doris Web UI,可以看到 Doris 中展示的订单表会实时更新,如下图:
同样的,通过修改 ‘shipments’ 和 ‘products’ 表,也可以在 Doris 中实时看到同步修改的结果。
路由变更
Flink CDC 提供了将源表的表结构/数据路由到其他表名的配置。
利用此功能,我们可以实现表名、数据库名替换、全库同步等功能。以下是使用路由功能的示例文件:
################################################################################
# Description: Sync MySQL all tables to Doris
################################################################################
source:type: mysqlhostname: localhostport: 3306username: rootpassword: 123456tables: app_db.\.*server-id: 5400-5404server-time-zone: UTCsink:type: dorisfenodes: 127.0.0.1:8030benodes: 127.0.0.1:8040username: rootpassword: ""table.create.properties.light_schema_change: truetable.create.properties.replication_num: 1route:- source-table: app_db.orderssink-table: ods_db.ods_orders- source-table: app_db.shipmentssink-table: ods_db.ods_shipments- source-table: app_db.productssink-table: ods_db.ods_productspipeline:name: Sync MySQL Database to Dorisparallelism: 2
通过上面的路由配置,我们可以将app_db.orders的表结构和数据同步到ods_db.ods_orders中,从而实现数据库迁移的功能。具体来说,source-table支持正则匹配多表来同步分库分表,如下:
route:- source-table: app_db.order\.*sink-table: ods_db.ods_orders
这样我们就可以将app_db.order01、app_db.order02、app_db.order03等分片表同步到一张ods_db.ods_orders表中了。
注意,目前还不支持多张表存在相同主键数据的场景,后续版本会支持。
清理
完成教程后,运行以下命令停止docker-compose.yml目录中的所有容器:
docker-compose down
在Flink flink-1.18.0目录下,执行以下命令停止Flink集群:
./bin/stop-cluster.sh