您的位置:首页 > 游戏 > 手游 > 四川省人民政府官方网_a963中华室内设计官网_游戏优化大师官网_上百度推广的网站要多少钱

四川省人民政府官方网_a963中华室内设计官网_游戏优化大师官网_上百度推广的网站要多少钱

2025/1/11 18:06:49 来源:https://blog.csdn.net/lzhlizihang/article/details/144375701  浏览:    关键词:四川省人民政府官方网_a963中华室内设计官网_游戏优化大师官网_上百度推广的网站要多少钱
四川省人民政府官方网_a963中华室内设计官网_游戏优化大师官网_上百度推广的网站要多少钱

文章目录

  • 一、物理分区
    • 1、自定义分区+重分区(解决数据倾斜)
  • 二、Sink
    • 1、JDBC Connector(JDBC连接器)
    • 2、Kafka Connector(Kafka连接器)
    • 3、自定义Sink


一、物理分区

[图片]

1、自定义分区+重分区(解决数据倾斜)

重分区能够解决数据倾斜

package com.bigdata.transformation;import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.Partitioner;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;// 自定义分区
class MyPartitioner implements Partitioner<Long>{@Overridepublic int partition(Long key, int numPartitions) {// 小于等于10000的放到1分区,否则放到2分区if (key <= 10000){return 0;}return 1;}
}public class CustomPartition {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);env.setParallelism(2);DataStreamSource<Long> dataStreamSource = env.fromSequence(1, 15000);// 自定义分区APIDataStream<Long> dataStream = dataStreamSource.partitionCustom(new MyPartitioner(), new KeySelector<Long, Long>() {@Overridepublic Long getKey(Long value) throws Exception {return value;}});// 查看每个分区的数据量dataStream.map(new RichMapFunction<Long, Tuple2<Integer,Integer>>() {@Overridepublic Tuple2<Integer, Integer> map(Long aLong) throws Exception {int partitions = getRuntimeContext().getIndexOfThisSubtask();return Tuple2.of(partitions,1);}}).keyBy(new KeySelector<Tuple2<Integer, Integer>, Integer>() {@Overridepublic Integer getKey(Tuple2<Integer, Integer> integerIntegerTuple2) throws Exception {return integerIntegerTuple2.f0;}}).sum(1).print("前:");// 打印自定义分区结果// dataStream.print();// 进行重分区DataStream<Long> rebalance = dataStream.rebalance();//查看重分区每个分区的数据量rebalance.map(new RichMapFunction<Long, Tuple2<Integer,Integer>>() {@Overridepublic Tuple2<Integer, Integer> map(Long aLong) throws Exception {int partitions = getRuntimeContext().getIndexOfThisSubtask();return Tuple2.of(partitions,1);}}).keyBy(new KeySelector<Tuple2<Integer, Integer>, Integer>() {@Overridepublic Integer getKey(Tuple2<Integer, Integer> integerIntegerTuple2) throws Exception {return integerIntegerTuple2.f0;}}).sum(1).print("后:");env.execute("自定义分区+重分区(解决数据倾斜)");}
}

二、Sink

有print、writerAsText(以文本格式输出)、Connectors(连接器)

1、JDBC Connector(JDBC连接器)

导包:

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc_2.11</artifactId><version>${flink.version}</version>
</dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.25</version>
</dependency>

代码演示:

package com.bigdata.sink;import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.api.common.RuntimeExecutionMode;import java.sql.PreparedStatement;
import java.sql.SQLException;/**@基本功能:@program:FlinkDemo@author: hang@create:2024-11-22 15:55:54**/@Data
@NoArgsConstructor
@AllArgsConstructor
class Student {private int id;private String name;private int age;
}public class JdbcConnector {public static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);//2. source-加载数据DataStreamSource<Student> studentDataStreamSource = env.fromElements(new Student(1, "zhanngsan", 18),new Student(2, "lisi", 19),new Student(3, "wangwu", 20));//3. transformation-数据处理转换//4. sink-数据输出JdbcConnectionOptions jdbcConnectionOptions = new JdbcConnectionOptions.JdbcConnectionOptionsBuilder().withDriverName("com.mysql.cj.jdbc.Driver").withUrl("jdbc:mysql://localhost:3306/mydb01").withUsername("root").withPassword("123456").build();studentDataStreamSource.addSink(JdbcSink.sink("insert into student values (?,?,?)",new JdbcStatementBuilder<Student>() {@Overridepublic void accept(PreparedStatement preparedStatement, Student student) throws SQLException {preparedStatement.setInt(1,student.getId());preparedStatement.setString(2,student.getName());preparedStatement.setInt(3,student.getAge());}},jdbcConnectionOptions// 假如是流的方式可以设置两条插入一次//JdbcExecutionOptions.builder().withBatchSize(2).build(),jdbcConnectionOptions));//5. execute-执行env.execute();}
}

2、Kafka Connector(Kafka连接器)

需求:从Kafka的topic1中消费日志数据,并做实时ETL,将状态为success的数据写入到Kafka的topic2中

package com.bigdata.sink;import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;import java.util.Properties;/**@基本功能:@program:FlinkDemo@author: hang@create:2024-11-22 16:38:58**/
public class KafkaConnector {public static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);//2. source-加载数据Properties properties = new Properties();properties.setProperty("bootstrap.servers","node01:9092");properties.setProperty("group.id", "g1");FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>("topic1", new SimpleStringSchema(), properties);DataStreamSource<String> dataStreamSource = env.addSource(kafkaConsumer);//3. transformation-数据处理转换SingleOutputStreamOperator<String> success = dataStreamSource.filter(new FilterFunction<String>() {@Overridepublic boolean filter(String s) throws Exception {return s.contains("success");}});//4. sink-数据输出FlinkKafkaProducer<String> kafkaProducer = new FlinkKafkaProducer<>("topic2", new SimpleStringSchema(), properties);success.addSink(kafkaProducer);//5. execute-执行env.execute();}

}

3、自定义Sink

模拟jdbcSink的实现
jdbcSink官方已经提供过了,此处仅仅是模拟它的实现,从而学习如何自定义sink

package com.bigdata.sink;import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;class MyJdbcSink extends RichSinkFunction<Student>{Connection conn = null;PreparedStatement statement = null;@Overridepublic void open(Configuration parameters) throws Exception {// 注册驱动(安转驱动)  此时这句话可以省略  如果书写的话,mysql8.0 带 cjClass.forName("com.mysql.cj.jdbc.Driver");// 获取数据库连接对象 Connectionconn = DriverManager.getConnection("jdbc:mysql://localhost:3306/mydb01","root","123456");// 执行sql语句statement = conn.prepareStatement("insert into student values (?,?,?)");}@Overridepublic void close() throws Exception {// 释放资源statement.close();conn.close();}@Overridepublic void invoke(Student student, Context context) throws Exception {statement.setInt(1,student.getId());statement.setString(2,student.getName());statement.setInt(3,student.getAge());statement.execute();}
}public class jdbcCustomSink {public static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);//2. source-加载数据DataStreamSource<Student> studentDataStreamSource = env.fromElements(new Student(4, "zhaoliu", 18),new Student(5, "qianqi", 19),new Student(6, "wuba", 20));//3. transformation-数据处理转换//4. sink-数据输出DataStreamSink<Student> studentDataStreamSink = studentDataStreamSource.addSink(new MyJdbcSink());//5. execute-执行env.execute();}
}

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com