您的位置:首页 > 新闻 > 资讯 > 网站排名英文怎么说_网站制作基础教程_百度竞价推广费用_接推广怎么收费

网站排名英文怎么说_网站制作基础教程_百度竞价推广费用_接推广怎么收费

2025/3/17 17:31:45 来源:https://blog.csdn.net/weixin_51918722/article/details/146095345  浏览:    关键词:网站排名英文怎么说_网站制作基础教程_百度竞价推广费用_接推广怎么收费
网站排名英文怎么说_网站制作基础教程_百度竞价推广费用_接推广怎么收费

Logstash同步MySQL到ES

前提

软件版本

  • Logstash: 7.10.2

  • ES: 7.10.2

  • MySQL: 5.7

Logstash和ES安装在Windows中, MySQL安装在虚拟机中

配置

user表建表语句

CREATE TABLE `user` (`id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '主键',`name` varchar(255) DEFAULT NULL,`create_date` datetime DEFAULT NULL,`update_date` datetime DEFAULT NULL,PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4

位置信息

创建userDTS_last_value.txt, 不需要内容, 到时候logstash会写入

同步user数据的userDTS.sql

SELECT id as id, name as name, DATE_FORMAT(create_date,'%Y-%m-%d %H:%i:%S') as createDate, DATE_FORMAT(update_date,'%Y-%m-%d %H:%i:%S') as updateDate 
FROM user
WHERE# 增量数据的查询条件,一条数据怎么才算增量数据就是由这个条件确定的,而其中的::sql_last_value就是第2步中配置项中的last_run_metadata_path里配置的文件路径对应的文件中的值,也就是每次同步数据后最新一条的updateDate值,那么下次同步数据的时候,sql根据这个条件就不会查到非增量数据,同时第2步中配置项中的tracking_column配置的值就是updateDate,实际使用的时候可以根据场景更换这个字段 # 解决Logstash的sql_last_value 和MySQL时区不一致问题update_date >= CONVERT_TZ(:sql_last_value, '+08:00', '+00:00') AND CONVERT_TZ(:sql_last_value, '+08:00', '+00:00') < now()

logstash-mysql-es.conf

注意事项

  1. MySQL驱动器jar包需要指定, 要和MySQL的版本兼容
  2. logstash的同步不是实时同步的, 使用同步可以使用Canel, Flink CDC监听Binlog的方式, 可以参考博主的SpringBoot集成Flink-CDC
  3. 存量更新和存量更新配置根据自己的需求开启
  4. 相关位置改成自己, 比如mysql, sql存放位置
  5. 启动logstash的时候需要指定logstash-mysql-es.conf配置启动
    • 我的配置存放logstash安装目录的config中, 进入bin目录, 执行cmd, 输入启命令为logstash -f …\config\logstash-mysql-es.conf
# 输入源
input {jdbc { # 数据库连接参数jdbc_connection_string => "jdbc:mysql://192.168.132.10:3306/whitebrocade?characterEncoding=UTF-8&useUnicode=true&useSSL=false&serverTimezone=Asia/Shanghai"# mysql用户名jdbc_user => "root"# mysql密码jdbc_password => "12345678"# mysql驱动器jar包, 我这里用的是5.7的MySQLjdbc_driver_library => "E:/software/Maven/repository/mysql/mysql-connector-java/5.1.49/mysql-connector-java-5.1.49.jar"# 驱动类名jdbc_driver_class => "com.mysql.jdbc.Driver"# 开启分页jdbc_paging_enabled => "true"# 最大页码jdbc_page_size => "1000"# 这个type可以用来做多个输入源和多个输出源区分  这里举例所以只有一个type => "user_DTS" # 用于同步的查询sqlstatement_filepath => "E:/software/Logstash/test_data/userDTS.sql"# 直接书写sql语句# statement => "select * from user"# 加上jdbc时区, 要不然logstash的时间会不准确jdbc_default_timezone => "Asia/Shanghai"# 是否将字段名转换为小写,默认true(如果有数据序列化、反序列化需求,建议改为false)lowercase_column_names => "false"# 处理中文乱码问题codec => plain { charset => "UTF-8" }# 日志级别sql_log_level => warn# 如果需要增量更新的话,则需要在input/jdbc下添加如下配置# 如果要使用其它字段追踪, 而不是用时间开启这个配置use_column_value => true# 这个就是追踪字段的类型,只有数值和时间两个类型(numeric和timestamp,默认numeric) 这个值会记录到last_run_metadata_path 配置的文件中 如果配置是numeric 那么默认值为0 如果配置为timestamp 那么默认值为1970年tracking_column_type => "timestamp"# 追踪的字段  这个字段只有在上面的lowercase_column_names配置为false的时候才会区分大小写  因为不配置为false的话默认是true  查询结果字段默认会变成全小写tracking_column => "updateDate"# 是否记录上次执行结果,true表示会将上次执行结果的tracking_column字段的值保存到last_run_metadata_path指定的文件中;record_last_run => true# 上一个sql_last_value值的存放文件路径, 必须要在文件中指定字段的初始值  这个就是增量数据同步的关键last_run_metadata_path => "E:/software/Logstash/test_data/userDTS_last_value.txt"# 是否清除 last_run_metadata_path 的记录,如果为true那么每次都相当于从头开始查询所有的数据库记录clean_run => false# 设置监听间隔  各字段含义秒(由左至右)分、时、天、月、年# 这里配置为每15秒扫描一次MySQLschedule => "*/15 * * * * *"}
}# 过滤
filter {mutate {# 去掉没用字段, 注意了type不要去除掉, 这里需要根据type判断是否同步到ESremove_field  => ["@version", "@timestamp"]}
}# 输出源
output {# 判断类型是否位user_DTS(即是否为user表相关的, 如果是就同步到ES中)if[type] == "user_DTS"{elasticsearch {# elasticsearch urlhosts => ["http://localhost:9200"]# 用户名&密码# user => "whitebrocade"# password => "whitebrocade"# 下面两个参数可以开启更新模式action => "update"doc_as_upsert => true# 索引名index => "user"# 文档id 设置成数据库的iddocument_id => "%{id}"}}stdout {# 以json格式输出到控制台, 用于调试使用, 正式运行时可以注释掉codec => json_lines}
} 

ES创建user索引

Logstash同步的时候, 并不会创建user的索引, 所以这里我们得自己手动创建一下

推荐一个ES的客户端工具,es-clint, 下边使用这工具创建

PUT /user
{"settings": {"number_of_shards": 3,"number_of_replicas": 2},"mappings": {"properties": {"id": {"type": "long"},"name": {"type": "text"},"createDate": {"type": "date","format": "yyyy-MM-dd HH:mm:ss||epoch_millis"},"updateDate": {"type": "date","format": "yyyy-MM-dd HH:mm:ss||epoch_millis"}}}
}

创建结果

image-20250307141251259

image-20250307141348283

测试

现在的MySQL和ES都没有数据

新增

往MySQL中新建一条数据
INSERT INTO user (name, create_date, update_date) 
VALUES ('xm', NOW(), NOW());

image-20250307141701181

logstash日志

image-20250307142724014

查看ES的user索引

image-20250307142748466

修改

修改id为1的数据
UPDATE user 
SETname = 'whiteBrocade',update_date = NOW() 
WHERE id = 1;

image-20250307143205419

logstash日志

image-20250307143249717

查看ES的user索引

image-20250307143322543

参考资料

Running Logstash on Docker | Logstash Reference8.17

Docker 安装 LogStash的详细过程

Docker部署Logstash同步Mysql数据到ES方式

使用docker安装logstash的具体方法

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com