您的位置:首页 > 科技 > IT业 > 整站下载工具软件_简单免费模板_yandex搜索引擎_百度手机助手网页版

整站下载工具软件_简单免费模板_yandex搜索引擎_百度手机助手网页版

2024/12/28 6:16:56 来源:https://blog.csdn.net/it_erge/article/details/143349882  浏览:    关键词:整站下载工具软件_简单免费模板_yandex搜索引擎_百度手机助手网页版
整站下载工具软件_简单免费模板_yandex搜索引擎_百度手机助手网页版
一、概述

本文介绍了如何使用Apache Flink从Kafka接收数据流,并将处理后的数据写入MySQL数据库。Apache Flink是一个开源的流处理框架,支持高吞吐量的数据处理,并且具备低延迟的特性。本文将详细讲解如何通过Flink连接Kafka和MySQL,以及实现数据流的接收和写入。

二、环境准备
  1. Flink环境:确保已经安装并配置好Apache Flink。
  2. Kafka环境:确保Kafka已经安装并运行,且有一个可用的topic。
  3. MySQL环境:确保MySQL数据库已经安装并运行,且有一个可用的数据库和表。
三、依赖配置

在Flink项目中,需要引入以下依赖:

  • Flink的核心依赖
  • Flink的Table API依赖
  • Kafka连接器依赖
  • JDBC连接器依赖
  • MySQL的JDBC驱动依赖

Maven依赖配置示例如下:

<dependencies>  <!-- Flink核心依赖 -->  <dependency>  <groupId>org.apache.flink</groupId>  <artifactId>flink-streaming-java_2.12</artifactId>  <version>1.19.0</version>  </dependency>  <dependency>  <groupId>org.apache.flink</groupId>  <artifactId>flink-table-api-java-bridge_2.12</artifactId>  <version>1.19.0</version>  </dependency>  <!-- Kafka连接器依赖 -->  <dependency>  <groupId>org.apache.flink</groupId>  <artifactId>flink-connector-kafka_2.12</artifactId>  <version>1.19.0</version>  </dependency>  <!-- JDBC连接器依赖 -->  <dependency>  <groupId>org.apache.flink</groupId>  <artifactId>flink-connector-jdbc_2.12</artifactId>  <version>1.19.0</version>  </dependency>  <!-- MySQL JDBC驱动依赖 -->  <dependency>  <groupId>mysql</groupId>  <artifactId>mysql-connector-java</artifactId>  <version>8.0.22</version>  </dependency>  
</dependencies>
四、Flink作业实现
package com.iterge.flink.job;import com.alibaba.fastjson2.JSONObject;
import com.iterge.flink.entity.User;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;/*** @author iterge* @version 1.0* @date 2024/10/29 17:15* @description mysql示例*/@Slf4j
public class FlinkMysqlDemo {private static final String sql = "insert into t_user(name) values(?)";private static Connection connection;private static PreparedStatement preparedStatement;public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();KafkaSource<String> source = KafkaSource.<String>builder().setBootstrapServers("localhost:9092").setTopics("it.erge.test.topic").setGroupId("it.erge.test.topic.1").setStartingOffsets(OffsetsInitializer.latest()).setValueOnlyDeserializer(new SimpleStringSchema()).build();DataStreamSource<String> stringDataStreamSource = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");DataStream<User> map = stringDataStreamSource.map(x -> {User user = new User();user.setName(x);System.out.println(JSONObject.toJSONString(user));return user;});map.addSink(new RichSinkFunction<User>() {@Overridepublic void invoke(User value, Context context) throws Exception {super.invoke(value, context);try {// 设置 PreparedStatement 的参数//preparedStatement.setInt(1, value.getId());preparedStatement.setString(1, value.getName());// 执行插入操作preparedStatement.executeUpdate();} catch (SQLException e) {// 处理 SQL 异常,例如记录日志或抛出运行时异常System.err.println("Failed to insert data into the database: " + e.getMessage());// 可以选择重新抛出异常以停止作业,或者记录并忽略它// throw new RuntimeException("Database write failed", e);}}@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);// 初始化数据库连接和 PreparedStatementString url = "jdbc:mysql://localhost:3306/test";String user = "root";//String password = "mypassword";connection = DriverManager.getConnection(url, user,null);preparedStatement = connection.prepareStatement(sql);}@Overridepublic void close() throws Exception {super.close();// 关闭 PreparedStatement 和 Connectionif (preparedStatement != null) {preparedStatement.close();}if (connection != null) {connection.close();}}});map.print();env.execute("mysql-demo");}
}
五、总结

本文详细介绍了如何使用Apache Flink从Kafka接收数据流,并将处理后的数据写入MySQL数据库。通过配置依赖、创建Flink执行环境、定义Kafka和MySQL表、编写插入语句等步骤,成功实现了数据的接收和写入。在实际应用中,可以根据具体需求对代码进行调整和优化。

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com