您的位置:首页 > 科技 > 能源 > 零基础如何做运营_桂林网页制作_seo站长网_自助建站的优势

零基础如何做运营_桂林网页制作_seo站长网_自助建站的优势

2024/12/22 2:46:01 来源:https://blog.csdn.net/jiandanfeng2/article/details/144058624  浏览:    关键词:零基础如何做运营_桂林网页制作_seo站长网_自助建站的优势
零基础如何做运营_桂林网页制作_seo站长网_自助建站的优势

canal简介

官网:https://github.com/alibaba/canal

主要是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费,是一个实时同步的方案。

基于日志增量订阅和消费的业务包括

  • 数据库镜像
  • 数据库实时备份
  • 索引构建和实时维护(拆分异构索引、倒排索引等)
  • 业务 cache 刷新
  • 带业务逻辑的增量数据处理

当前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x。

工作原理

   canal相当于一个mysql slave节点,工作原理如下:

  • canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送dump 协议
  • MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )
  • canal 解析 binary log 对象(原始为 byte 流)

canal各个组件

canal 特别设计了 client-server 模式,交互协议使用 protobuf 3.0 , client(消费端)支持多种语言:

  • canal java 客户端: https://github.com/alibaba/canal/wiki/ClientExample
  • canal c# 客户端: https://github.com/dotnetcore/CanalSharp
  • canal go客户端: https://github.com/CanalClient/canal-go
  • canal php客户端: https://github.com/xingwenge/canal-php
  • canal Python客户端:https://github.com/haozi3156666/canal-python
  • canal Rust客户端:https://github.com/laohanlinux/canal-rs
  • canal Nodejs客户端:https://github.com/marmot-z/canal-nodejs

记住:canal是基于订阅和消费机制的,这边的client就是消费端,为了支持更多的语言或者防止消费能力不足,可以把消息直接投递到mq,借助mq的削峰平谷的能力

  • 最重要的组件是deployer,也就是server
  • admin是一个webUI的动态管理组件,根据需要搭建
  • example是client的一些例子
  • adapter是一种客户端数据落地的适配,不如你想把数据同步到hbase,你可以直接用adapter,adapter会帮忙你做数据格式转换等,当然也可以通过客户端自己写。

实践

环境版本

  • mysql版本:5.7.24 docker版
  • canal版本:1.1.8-alpha-3

docker部署mysql

 启动一个实例,如下

docker run -d --name mysql-test -e MYSQL_ROOT_PASSWORD=123456 -p 3306:3306 mysql:5.7.24

修改容器内/etc/mysql/mysql.conf.d/mysqld.cnf的内容,在末尾增加如下3行内容,可以通过docker cp来修改

[mysqld]
pid-file	= /var/run/mysqld/mysqld.pid
socket		= /var/run/mysqld/mysqld.sock
datadir		= /var/lib/mysql
#log-error	= /var/log/mysql/error.log
# By default we only accept connections from localhost
#bind-address	= 127.0.0.1
# Disabling symbolic-links is recommended to prevent assorted security risks
symbolic-links=0
# 以下3行是新增的
server_id=1
log_bin=mysql-bin
binlog_format=ROW

重新启动容器

docker restart mysql-test

通过客户端(比如DBeaver连接mysql),查看这个配置参数

创建数据库test

创建表sys_user

CREATE TABLE `sys_user` (`user_id` int(11) NOT NULL AUTO_INCREMENT COMMENT '用户ID',`user_name` varchar(60) NOT NULL COMMENT '用户昵称',`email` varchar(50) DEFAULT '' COMMENT '用户邮箱',`sex` char(1) DEFAULT '0' COMMENT '用户性别(0男 1女 2未知)',`password` varchar(50) DEFAULT '' COMMENT '密码',`status` char(1) DEFAULT '0' COMMENT '帐号状态(0正常 1停用)',PRIMARY KEY (`user_id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COMMENT='用户信息表';

到此mysql的准备工作已结束,重点在于开启bin log,并设置format为ROW,因为canal是作为mysql slave存在的,需要通过binlog来同步数据。

部署canal-server

通过地址Release canal-1.1.8-alpha-3 · alibaba/canal · GitHub 下载1.1.8-alpha-3版本得到

canal.deployer-1.1.8-SNAPSHOT.tar.gz,解压缩得到如下的目录

打开conf/example/instance.properties配置源数据库(mysql)的信息,我这边偷懒直接用root账号,密码来自docker部署mysql启动时设置的密码

进入bin,直接点击startup.bat(windows环境)

然后打开logs/canal/canal.log,如果出现以下信息表示启动成功。接下来就可以通过客户端去订阅了。

例子1:canal-client的使用

在idea中新建一个maven项目,引入client的依赖

        <!-- https://mvnrepository.com/artifact/com.alibaba.otter/canal.client --><dependency><groupId>com.alibaba.otter</groupId><artifactId>canal.client</artifactId><version>1.1.7</version></dependency><!-- https://mvnrepository.com/artifact/com.alibaba.otter/canal.protocol --><dependency><groupId>com.alibaba.otter</groupId><artifactId>canal.protocol</artifactId><version>1.1.7</version></dependency>

新增client类-CanalClient,代码如下:

package com.example.demo.canal;import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;import java.net.InetSocketAddress;
import java.util.HashMap;
import java.util.List;
import java.util.Map;public class CanalClient {public static void main(String[] args) throws InvalidProtocolBufferException {// 1. 创建链接,即server的地址和端口,其中example就是源数据库的名称(来自canal.deployer下的conf下的example文件及名称)CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("127.0.0.1",11111), "example", "", "");while (true) {// 2. 获取连接connector.connect();// 3. 指定订阅的数据库testconnector.subscribe("test.*");// 4. 获取MessageMessage message = connector.get(100);List<CanalEntry.Entry> entries = message.getEntries();if (entries.size() == 0) {System.out.println("没有数据,休息下");try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}} else {for (CanalEntry.Entry entry : entries) {// 获取表名String tableName = entry.getHeader().getTableName();// Entry类型CanalEntry.EntryType entryType = entry.getEntryType();if (CanalEntry.EntryType.ROWDATA.equals(entryType)) {// 获取序列化数据ByteString storeValue = entry.getStoreValue();// 反序列化CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(storeValue);// 获取事件类型CanalEntry.EventType eventType = rowChange.getEventType();// 获取具体数据List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList();// 遍历并打印数据for (CanalEntry.RowData rowData : rowDatasList) {List<CanalEntry.Column> beforeColumnsList = rowData.getBeforeColumnsList();Map<String, Object> bMap = new HashMap<>();for (CanalEntry.Column column : beforeColumnsList) {bMap.put(column.getName(), column.getValue());}List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList();Map<String, Object> aMap = new HashMap<>();for (CanalEntry.Column column : afterColumnsList) {aMap.put(column.getName(), column.getValue());}System.out.println("表名:" + tableName + ",操作类型:" + eventType);System.out.println(",变化前:" + bMap);System.out.println(",变化后:" + aMap);}}}}}}
}

启动后,控制台打印如下:

当往数据库插入一条记录

insert sys_user(user_name, email, sex, `password`, status) values('张三','zhangsan@qq.com', 0, '123',0);

结果,控制台打印如下:

当修改记录时,控制台打印如下

update sys_user set `password`='123456' where user_name='张三';

当删除记录时,控制台打印如下

delete from sys_user where user_name='张三';

至此获取到的数据的变化,你可以把他手动转换、处理等等操作,也可以同步到其他的数据库,灵活性很大,但是如果当只是想把数据同步到其他数据库时,可以直接选择使用client-adapter,不需要编码,client-adapter可以认为是针对特定场景的加强版的客户端。

例子2:springboot集成canal-client

这边采用非官方封装的jar包,但是使用很方便,首先新建一个springboot项目(略)

引入以下maven依赖

<dependency><groupId>top.javatool</groupId><artifactId>canal-spring-boot-starter</artifactId><version>1.2.6-RELEASE</version>
</dependency>

使用很简单,比如同步数据到redis,详细参考官网:GitHub - NormanGyllenhaal/canal-client: spring boot canal starter 易用的canal 客户端 canal client

例子3:同步数据到clickhouse(client-adapter的使用)

clickhouse-adapter是1.18新增的,不知是我的配置还有问题,当sys_user没有id字段时,只有新增能正常同步到clickhouse,故把sys_user.user_id改为id。

先部署clickhouse,在default新建表,建表语句如下:

CREATE TABLE default.test_user
(`id` Int64,`user_name` String,`email` String,`sex` String,`password` String,`status` String
)
ENGINE = MergeTree
PRIMARY KEY (id)  -- 明确指定 PRIMARY KEY
ORDER BY (id)     -- ORDER BY 是必须的
SETTINGS index_granularity = 8192;

通过地址Release canal-1.1.8-alpha-3 · alibaba/canal · GitHub 下载1.1.8-alpha-3版本得到

canal.adapter-1.1.8-SNAPSHOT.tar.gz

解压缩得到如下的目录

在conf下新增clickhouse文件夹,并且新建test_user.yml文件,内容如下:

#dataSourceKey: defaultDS
destination: example
groupId: g1
outerAdapterKey: clickhouse1
concurrent: true
dbMapping:database: testtable: sys_usertargetTable: default.test_usertargetPk:id: id
#  mapAll: truetargetColumns:id:user_name:email:sex:password:status:  #etlCondition: "where id>={}"commitBatch: 3000 # 批量提交的大小

修改conf下的application.yml,把outerAdapters: 下的配置改为如下:主要是配置下目标clickhouse数据库的信息。

启动bin下的startup.bat(windows系统),当看到如下信息表示启动成功

此时去mysql中新增几条记录,adapter的控制台会打印出对应的信息:

查看clickhouse,会发现记录已同步,删除和修改类似的。

部署canal-admin

新建数据库canal_manager,用于admin,这边偷个懒,还是用root账号

通过地址Release canal-1.1.8-alpha-3 · alibaba/canal · GitHub 下载1.1.8-alpha-3版本得到

canal.admin-1.1.8-SNAPSHOT.tar.gz

解压缩得到如下的目录

把conf下的canal_manager.sql sql导入数据库中,进行数据初始化。

修改conf/application.yml的配置,如下:

server:port: 8089
spring:jackson:date-format: yyyy-MM-dd HH:mm:sstime-zone: GMT+8spring.datasource:address: 127.0.0.1:3306database: canal_managerusername: rootpassword: 123456driver-class-name: com.mysql.jdbc.Driverurl: jdbc:mysql://127.0.0.1:3306/canal_manager?useUnicode=true&characterEncoding=UTF-8&useSSL=falsehikari:maximum-pool-size: 30minimum-idle: 1canal:adminUser: adminadminPasswd: admin

启动bin/startup.bat

此时代表canal-admin已经启动成功,可以通过 http://127.0.0.1:8089/ 访问,默认密码:admin/123456

关闭admin,接下去要把canal-server对接到admin上,

修改canal-server的conf下的canal_local.properties,内容如下:

# register ip
canal.register.ip =# canal admin config
canal.admin.manager = 127.0.0.1:8089
canal.admin.port = 11110
canal.admin.user = admin
canal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441
# admin auto register
canal.admin.register.auto = true
canal.admin.register.cluster =
canal.admin.register.name = 

然后在canal-deploy目录下以local的配置启动canal-server

$ bin/startup.bat local

重启canal-admin

进入网页端,能看到如下信息,server管理已经存在一个新的节点了

使用心得

canal定位是一个增量日志同步,但是也有方式可以设置全量同步,但都需要canal-adapter支持

  • canal可以设置全库同步,详见参考:Sync RDB · alibaba/canal Wiki · GitHub
  • 对于表级别的全量同步,可以使用etl接口,详见:ClientAdapter · alibaba/canal Wiki · GitHub
  • 配合datax使用,datax先全量同步,再使用canal增量同步。

题外话

      canal作为一个国内使用非常广的实时同步项目,经过一系列的入门调研到现在想放弃了,因为这个项目的维护不行了,一年只发一个版本,作为竞品debezium和flinkcdc不管是版本发布还是issue的回复都很火热,关键是发现了alpha版的etl bug存在大半年了,还没修复,所以想弃坑,转向flink cdc了。

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com