一、概述
本文介绍了如何使用Apache Flink从Kafka接收数据流,并将处理后的数据写入MySQL数据库。Apache Flink是一个开源的流处理框架,支持高吞吐量的数据处理,并且具备低延迟的特性。本文将详细讲解如何通过Flink连接Kafka和MySQL,以及实现数据流的接收和写入。
二、环境准备
- Flink环境:确保已经安装并配置好Apache Flink。
- Kafka环境:确保Kafka已经安装并运行,且有一个可用的topic。
- MySQL环境:确保MySQL数据库已经安装并运行,且有一个可用的数据库和表。
三、依赖配置
在Flink项目中,需要引入以下依赖:
- Flink的核心依赖
- Flink的Table API依赖
- Kafka连接器依赖
- JDBC连接器依赖
- MySQL的JDBC驱动依赖
Maven依赖配置示例如下:
<dependencies> <!-- Flink核心依赖 --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_2.12</artifactId> <version>1.19.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-java-bridge_2.12</artifactId> <version>1.19.0</version> </dependency> <!-- Kafka连接器依赖 --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka_2.12</artifactId> <version>1.19.0</version> </dependency> <!-- JDBC连接器依赖 --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-jdbc_2.12</artifactId> <version>1.19.0</version> </dependency> <!-- MySQL JDBC驱动依赖 --> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>8.0.22</version> </dependency>
</dependencies>
四、Flink作业实现
package com.iterge.flink.job;import com.alibaba.fastjson2.JSONObject;
import com.iterge.flink.entity.User;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;/*** @author iterge* @version 1.0* @date 2024/10/29 17:15* @description mysql示例*/@Slf4j
public class FlinkMysqlDemo {private static final String sql = "insert into t_user(name) values(?)";private static Connection connection;private static PreparedStatement preparedStatement;public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();KafkaSource<String> source = KafkaSource.<String>builder().setBootstrapServers("localhost:9092").setTopics("it.erge.test.topic").setGroupId("it.erge.test.topic.1").setStartingOffsets(OffsetsInitializer.latest()).setValueOnlyDeserializer(new SimpleStringSchema()).build();DataStreamSource<String> stringDataStreamSource = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");DataStream<User> map = stringDataStreamSource.map(x -> {User user = new User();user.setName(x);System.out.println(JSONObject.toJSONString(user));return user;});map.addSink(new RichSinkFunction<User>() {@Overridepublic void invoke(User value, Context context) throws Exception {super.invoke(value, context);try {// 设置 PreparedStatement 的参数//preparedStatement.setInt(1, value.getId());preparedStatement.setString(1, value.getName());// 执行插入操作preparedStatement.executeUpdate();} catch (SQLException e) {// 处理 SQL 异常,例如记录日志或抛出运行时异常System.err.println("Failed to insert data into the database: " + e.getMessage());// 可以选择重新抛出异常以停止作业,或者记录并忽略它// throw new RuntimeException("Database write failed", e);}}@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);// 初始化数据库连接和 PreparedStatementString url = "jdbc:mysql://localhost:3306/test";String user = "root";//String password = "mypassword";connection = DriverManager.getConnection(url, user,null);preparedStatement = connection.prepareStatement(sql);}@Overridepublic void close() throws Exception {super.close();// 关闭 PreparedStatement 和 Connectionif (preparedStatement != null) {preparedStatement.close();}if (connection != null) {connection.close();}}});map.print();env.execute("mysql-demo");}
}
五、总结
本文详细介绍了如何使用Apache Flink从Kafka接收数据流,并将处理后的数据写入MySQL数据库。通过配置依赖、创建Flink执行环境、定义Kafka和MySQL表、编写插入语句等步骤,成功实现了数据的接收和写入。在实际应用中,可以根据具体需求对代码进行调整和优化。