您的位置:首页 > 汽车 > 新车 > 建设官网公司地址_西西美人美体_网站优化排名优化_泰安网站优化公司

建设官网公司地址_西西美人美体_网站优化排名优化_泰安网站优化公司

2025/3/7 7:13:12 来源:https://blog.csdn.net/u010398771/article/details/146069518  浏览:    关键词:建设官网公司地址_西西美人美体_网站优化排名优化_泰安网站优化公司
建设官网公司地址_西西美人美体_网站优化排名优化_泰安网站优化公司

在使用 Flink 进行数据处理时,批量插入(batch insert)数据库是一种常见的优化策略,可以减少数据库压力,提高写入吞吐量。然而,近期在一个 Flink Job的升级过程 中,我们发现新的 job 老是无法实现数据插入到数据库中,  定位到设置 mariadbBatchSize = 100 时,数据并未插入数据库,而修改为 mariadbBatchSize = 1 后,数据却能立刻写入。为什么会这样呢? 又该如何优化批量写入策略?我们对此进行深入探讨。

问题描述

在 Flink 任务中,我们通过 JDBC Sink 将数据写入 MariaDB,并设置批量写入参数 mariadbBatchSize = 100。然而,在流量较小时,数据并未及时插入数据库,直到流量达到一定程度才触发插入。后来,我们将 mariadbBatchSize 改为 1,数据却能立即写入。

问题分析

1. Flink 批量写入的工作机制

Flink 在进行数据库写入时,通常会采用 批量缓冲 + 触发写入 的方式。其基本流程如下:

  1. 数据缓冲:当 mariadbBatchSize = 100 时,Flink 会将数据缓存在内存中,直到达到 100 条数据才触发批量插入。

  2. 触发条件:一般情况下,以下几种情况会触发数据库写入:

    • 数据量达到 batchSize(如 100 条数据)

    • 达到设定的超时时间(如果 Flink Sink 支持该特性)

    • 任务完成或取消时触发 flush

    • Flink Checkpoint 机制(如果开启了 exactly-once 语义)

  3. 提交数据库:Flink 通过 JDBC 连接池执行 executeBatch(),将缓存的数据批量插入 MariaDB。

2. mariadbBatchSize = 100 导致数据未及时插入的原因

  • 流量不足:当数据流量较小时,缓冲区的数据量达不到 100,导致 executeBatch() 不会被触发。

  • 未设置超时机制:部分 JDBC Sink 并未实现超时机制,即即使数据迟迟未满 100 条,也不会自动触发批量插入。

  • 无 Checkpoint 触发:Flink 可能在 Checkpoint 时才会进行 flush,如果 Checkpoint 频率较低,也可能导致数据长时间滞留在缓冲区。

3. mariadbBatchSize = 1 立即插入的原因

  • 由于 mariadbBatchSize = 1,每来一条数据就会立即执行 executeBatch(),无需等待数据积累到 100 条,从而实现了即时写入

  • 但这样做的缺点是:每一条数据都会单独进行数据库插入,可能导致数据库负载过高,写入吞吐量下降。

解决方案与优化策略

为了解决该问题,我们需要在 降低写入延迟提高数据库吞吐量 之间找到平衡点。以下是一些优化建议:

1. 降低 mariadbBatchSize,避免长时间不触发插入

mariadbBatchSize 适当降低,例如 1020,可以在保证批量写入的同时,减少数据等待的时间。

2. 使用定时 flush 机制

如果 Flink 的 JDBC Sink 支持 flushInterval 机制,可以设定一个合理的时间间隔(如 5 秒),即使数据量未满 batchSize,也会定期写入数据库。

JdbcSink.sink("INSERT INTO my_table (field1, field2) VALUES (?, ?)",new JdbcStatementBuilder<MyType>() {@Overridepublic void accept(PreparedStatement ps, MyType value) throws SQLException {ps.setString(1, value.getField1());ps.setInt(2, value.getField2());}},JdbcExecutionOptions.builder().withBatchSize(100).withBatchIntervalMs(5000) // 设置定时触发间隔.build(),new JdbcConnectionOptions.JdbcConnectionOptionsBuilder().withUrl("jdbc:mariadb://localhost:3306/mydb").withDriverName("org.mariadb.jdbc.Driver").withUsername("user").withPassword("password").build()
)

3. 配合 Checkpoint 进行 flush

如果任务使用了 exactly-once 语义,可以在 Checkpoint 触发时 flush 数据,确保即使流量较低,数据也不会长期滞留。

JdbcExecutionOptions.builder().withBatchSize(100).withBatchIntervalMs(10000) // 每 10 秒 flush 一次.withMaxRetries(3) // 避免批量失败.build()

4. 优化数据库连接池

如果 batchSize 较小,单条写入的数据库连接开销较高,可以使用连接池优化,如 HikariCP:

new JdbcConnectionOptions.JdbcConnectionOptionsBuilder().withUrl("jdbc:mariadb://localhost:3306/mydb").withDriverName("org.mariadb.jdbc.Driver").withUsername("user").withPassword("password").withConnectionCheckTimeoutSeconds(30) // 避免连接过期.build()

5. 监控数据库写入情况

可以通过 Flink 的 metrics 监控批量插入的行为,确保数据写入是符合预期的。

env.getMetricGroup().counter("mariadb_insert_count");

最常见的方案配合 Checkpoint 进行 flush
在开启 exactly-once 语义时,Flink 任务通常依赖 Checkpoint 来保证数据一致性,同时触发 flush,确保未满 batchSize 的数据也能及时写入数据库。这是 生产环境 中最常见的做法,尤其适用于数据一致性要求高的应用。

最好的方案(综合考虑)定时 flush 机制 + Checkpoint 触发
单独依赖 Checkpoint 可能导致写入延迟过高,因此 结合定时 flush(如 5 秒)和 Checkpoint 触发 是最佳实践。这样可以确保:

  1. 流量大时batchSize 触发批量插入,提高吞吐量。
  2. 流量小时flushInterval 触发写入,避免数据长时间积压。
  3. 支持 Checkpoint,保证 exactly-once 语义,避免数据丢失或重复写入。

优化数据库连接池 主要是为了减少数据库连接开销,适用于所有方案,但并不是解决 batchSize 触发问题的核心方法,而是性能优化的一个补充。

综述:

  • 推荐组合:Checkpoint + 定时 flush
  • 最常见方案:Checkpoint 触发 flush
  • 优化连接池:作为辅助优化,提高数据库性能

结论

在 Flink 中使用 JDBC Sink 批量写入 MariaDB 时,需要权衡 mariadbBatchSize 的大小。如果 batchSize 过大且流量较低,可能导致数据长时间滞留,无法及时插入;如果 batchSize 过小,则可能降低数据库写入吞吐量。最佳实践是结合 定时 flush、Checkpoint 触发、优化连接池 等方式,实现高效稳定的数据写入。

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com