您的位置:首页 > 文旅 > 美景 > 北京高端商场_桂林两江四湖游_今日新闻网_怎么样做免费的百度seo

北京高端商场_桂林两江四湖游_今日新闻网_怎么样做免费的百度seo

2025/3/13 14:29:45 来源:https://blog.csdn.net/u010342213/article/details/146070743  浏览:    关键词:北京高端商场_桂林两江四湖游_今日新闻网_怎么样做免费的百度seo
北京高端商场_桂林两江四湖游_今日新闻网_怎么样做免费的百度seo

深入浅出Flink-第二天

目标

  1. 掌握常见的DataStream常见的source
  2. 掌握常见的DataStream的transformation操作
  3. 掌握常见的DataStream的sink操作
  4. 了解入门的DataSet API算子

📖 1. DataStream 的编程模型

  • DataStream 的编程模型包括四个部分:Environment、DataSource、Transformation、Sink
    在这里插入图片描述

📖 2. Flink的DataSource数据源

2.1 基于文件
readTextFile(path)
读取文本文件,文件遵循TextInputFormat读取规则,逐行读取并返回。
  • 案例

    package com.kaikeba.demo1import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
    import org.apache.flink.streaming.api.windowing.time.Timeobject StreamingSourceFromFile {def main(args: Array[String]): Unit = {//构建流处理的环境val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment//从socket获取数据val sourceStream: DataStream[String] = env.readTextFile("d:\\words.txt")//导入隐式转换的包import org.apache.flink.api.scala._//对数据进行处理val result: DataStream[(String, Int)] = sourceStream.flatMap(x => x.split(" ")) //按照空格切分.map(x => (x, 1))   //每个单词计为1.keyBy(0)          //按照下标为0的单词进行分组.sum(1)            //按照下标为1累加相同单词出现的1//保存结果result.writeAsText("d:\\result")//开启任务env.execute("FlinkStream")}
    }
2.2 基于socket
socketTextStream
从socker中读取数据,元素可以通过一个分隔符切开。
  • 案例

    package com.kaikeba.demo1import org.apache.flink.api.java.tuple.Tuple
    import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment, WindowedStream}
    import org.apache.flink.streaming.api.windowing.time.Time
    import org.apache.flink.streaming.api.windowing.windows.TimeWindow/*** 使用滑动窗口* 每隔1秒钟统计最近2秒钟的每个单词出现的次数*/
    object FlinkStream {def main(args: Array[String]): Unit = {//构建流处理的环境val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment//从socket获取数据val sourceStream: DataStream[String] = env.socketTextStream("node01",9999)//导入隐式转换的包import org.apache.flink.api.scala._//对数据进行处理val result: DataStream[(String, Int)] = sourceStream.flatMap(x => x.split(" ")) //按照空格切分.map(x => (x, 1))   //每个单词计为1.keyBy(0)          //按照下标为0的单词进行分组.sum(1)            //按照下标为1累加相同单词出现的1//对数据进行打印result.print()//开启任务env.execute("FlinkStream")}}
2.3 基于集合
fromCollection(Collection)
通过collection集合创建一个数据流,集合中的所有元素必须是相同类型的。
  • 案例

    package com.kaikeba.demo2import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}object StreamingSourceFromCollection {def main(args: Array[String]): Unit = {val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment//导入隐式转换的包import org.apache.flink.api.scala._//准备数据源--数组val array = Array("hello world","world spark","flink test","spark hive","test")val fromArray: DataStream[String] = environment.fromCollection(array)sparkConotext.//  val value: DataStream[String] = environment.fromElements("hello world")val resultDataStream: DataStream[(String, Int)] = fromArray.flatMap(x => x.split(" ")).map(x =>(x,1)).keyBy(0).sum(1)//打印resultDataStream.print()//启动environment.execute()}
    }
2.4 自定义输入
addSource 
可以实现读取第三方数据源的数据
  • 自定义单并行度数据源

    • 继承SourceFunction来自定义单并行度source

    • 代码开发

      package com.kaikeba.demo2import org.apache.flink.streaming.api.functions.source.SourceFunction
      import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}/*** 自定义单并行度source**/
      object MySourceRun {def main(args: Array[String]): Unit = {val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironmentimport org.apache.flink.api.scala._val getSource: DataStream[Long] = environment.addSource(new MySource).setParallelism(1)val resultStream: DataStream[Long] = getSource.filter(x => x %2 ==0)resultStream.setParallelism(1).print()environment.execute()}
      }//继承SourceFunction来自定义单并行度source
      class MySource extends SourceFunction[Long] {private var number = 1Lprivate var isRunning = trueoverride def run(sourceContext: SourceFunction.SourceContext[Long]): Unit = {while (isRunning){number += 1sourceContext.collect(number)Thread.sleep(1000)}}override def cancel(): Unit = {isRunning = false}
      }
  • 自定义多并行度数据源

    • 继承ParallelSourceFunction来自定义多并行度的source

    • 代码开发

      package com.kaikeba.demo2import org.apache.flink.streaming.api.functions.source.{ParallelSourceFunction, SourceFunction}
      import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}/**** 多并行度的source*/
      object MyMultipartSourceRun {def main(args: Array[String]): Unit = {//构建流处理环境val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironmentimport org.apache.flink.api.scala._//添加sourceval getSource: DataStream[Long] = environment.addSource(new MultipartSource).setParallelism(2)//处理val resultStream: DataStream[Long] = getSource.filter(x => x %2 ==0)resultStream.setParallelism(2).print()environment.execute()}
      }//继承ParallelSourceFunction来自定义多并行度的source
      class MultipartSource  extends ParallelSourceFunction[Long]{private var number = 1Lprivate var isRunning = trueoverride def run(sourceContext: SourceFunction.SourceContext[Long]): Unit = {while(true){number +=1sourceContext.collect(number)Thread.sleep(1000)}}override def cancel(): Unit = {isRunning = false}
      }
  • 此外系统内置提供了一批connectors,连接器会提供对应的source支持

    • Apache Kafka (source/sink) 后面重点分析
    • Apache Cassandra (sink)
    • Amazon Kinesis Streams (source/sink)
    • Elasticsearch (sink)
    • Hadoop FileSystem (sink)
    • RabbitMQ (source/sink)
    • Apache NiFi (source/sink)
    • Twitter Streaming API (source)

📖 3. Flink的Sink数据目标

  • writeAsText():将元素以字符串形式逐行写入,这些字符串通过调用每个元素的toString()方法来获取
  • print() / printToErr():打印每个元素的toString()方法的值到标准输出或者标准错误输出流中
  • 自定义输出addSink【kafka、redis】
  • 我们可以通过sink算子,将我们的数据发送到指定的地方去,例如kafka或者redis或者hbase等等,前面我们已经使用过将数据打印出来调用print()方法,接下来我们来实现自定义sink将我们的数据发送到redis里面去
    • Apache Kafka (source/sink)
    • Apache Cassandra (sink)
    • Amazon Kinesis Streams (source/sink)
    • Elasticsearch (sink)
    • Hadoop FileSystem (sink)
    • RabbitMQ (source/sink)
    • Apache NiFi (source/sink)
    • Twitter Streaming API (source)
    • Google PubSub (source/sink)
3.1 Flink写数据到redis中
  • 导入flink整合redis的jar包

    <dependency><groupId>org.apache.bahir</groupId><artifactId>flink-connector-redis_2.11</artifactId><version>1.0</version>
    </dependency>
    
  • 代码开发

    package com.kaikeba.demo2import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
    import org.apache.flink.streaming.connectors.redis.RedisSink
    import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig
    import org.apache.flink.streaming.connectors.redis.common.mapper.{RedisCommand, RedisCommandDescription, RedisMapper}/*** flink实时程序处理保存结果到redis中*/
    object Stream2Redis {def main(args: Array[String]): Unit = {//获取程序入口类val executionEnvironment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironmentimport org.apache.flink.api.scala._//组织数据val streamSource: DataStream[String] = executionEnvironment.fromElements("1 hadoop","2 spark","3 flink")//将数据包装成为key,value对形式的tupleval tupleValue: DataStream[(String, String)] = streamSource.map(x =>(x.split(" ")(0),x.split(" ")(1)))val builder = new FlinkJedisPoolConfig.Builder//设置redis客户端参数builder.setHost("node01")builder.setPort(6379)builder.setTimeout(5000)builder.setMaxTotal(50)builder.setMaxIdle(10)builder.setMinIdle(5)val config: FlinkJedisPoolConfig = builder.build()//获取redis  sinkval redisSink = new RedisSink[Tuple2[String,String]](config,new MyRedisMapper)//使用我们自定义的sinktupleValue.addSink(redisSink)//执行程序executionEnvironment.execute("redisSink")}
    }//定义一个RedisMapper类
    class MyRedisMapper  extends RedisMapper[Tuple2[String,String]]{override def getCommandDescription: RedisCommandDescription = {//设置插入数据到redis的命令new RedisCommandDescription(RedisCommand.SET)}//指定keyoverride def getKeyFromData(data: (String, String)): String = {data._1}//指定valueoverride def getValueFromData(data: (String, String)): String = {data._2}
    }

📖 4. DataStream 转换算子

  • 通过从一个或多个 DataStream 生成新的 DataStream 的过程被称为 Transformation 操作。在转换过程中,每种操作类型被定义为不同的 Operator, Flink 程序能够将多个 Transformation 组成一个 DataFlow 的拓扑。

  • DataStream 官网转换算子操作:

    https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/operators/index.html

4.1 map、filter
package com.kaikeba.demo3import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}object MapFilter {def main(args: Array[String]): Unit = {val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironmentimport  org.apache.flink.api.scala._val sourceStream: DataStream[Int] = environment.fromElements(1,2,3,4,5,6)val mapStream: DataStream[Int] = sourceStream.map(x =>x*10)val resultStream: DataStream[Int] = mapStream.filter(x => x%2 ==0)resultStream.print()environment.execute()}
}
4.2 flatMap、keyBy、sum
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.api.windowing.time.Time/*** 使用滑动窗口* 每隔1秒钟统计最近2秒钟的每个单词出现的次数*/
object FlinkStream {def main(args: Array[String]): Unit = {//获取程序入口类val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment//从socket当中获取数据val resultDataStream: DataStream[String] = environment.socketTextStream("node01",9999)//导入隐式转换的包import org.apache.flink.api.scala._//对数据进行计算操作val resultData: DataStream[(String, Int)] = resultDataStream.flatMap(x => x.split(" ")) //按照空格进行切分.map(x => (x, 1))  //程序出现一次记做1.keyBy(0)  //按照下标为0的单词进行统计.timeWindow(Time.seconds(2), Time.seconds(1)) //每隔一秒钟计算一次前两秒钟的单词出现的次数.sum(1)resultData.print()//执行程序environment.execute()}}
4.3 reduce
  • 是将输入的 KeyedStream 流通过传入的用户自定义的ReduceFunction滚动地进行数据聚合处理
package com.kaikeba.demo3import org.apache.flink.api.java.tuple.Tuple
import org.apache.flink.streaming.api.scala.{DataStream, KeyedStream, StreamExecutionEnvironment}object ReduceStream {def main(args: Array[String]): Unit = {val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironmentimport  org.apache.flink.api.scala._val sourceStream: DataStream[(String,Int)] = environment.fromElements(("a",1),("a",2),("b",2),("a",3),("c",2))val keyByStream: KeyedStream[(String, Int), Tuple] = sourceStream.keyBy(0)val resultStream: DataStream[(String, Int)] = keyByStream.reduce((t1,t2)=>(t1._1,t1._2+t2._2))resultStream.print()environment.execute()}
}
4.4 union
  • 把2个流的数据进行合并,2个流的数据类型必须保持一致
package com.kaikeba.demo3import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}/*** union算子*/
object UnionStream {def main(args: Array[String]): Unit = {val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironmentimport org.apache.flink.api.scala._val firstStream: DataStream[String] = environment.fromCollection(Array("hello spark","hello flink"))val secondStream: DataStream[String] = environment.fromCollection(Array("hadoop spark","hive flink"))//两个流合并成为一个流,必须保证两个流当中的数据类型是一致的val resultStream: DataStream[String] = firstStream.union(secondStream)resultStream.print()environment.execute()}
}
4.5 connect
  • 和union类似,但是只能连接两个流,两个流的数据类型可以不同
package com.kaikeba.demo3import org.apache.flink.streaming.api.functions.co.CoFlatMapFunction
import org.apache.flink.streaming.api.scala.{ConnectedStreams, DataStream, StreamExecutionEnvironment}
import org.apache.flink.util.Collector/*** 和union类似,但是只能连接两个流,两个流的数据类型可以不同,* 会对两个流中的数据应用不同的处理方法*/
object ConnectStream {def main(args: Array[String]): Unit = {val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironmentenvironment.setParallelism(1)import org.apache.flink.api.scala._val firstStream: DataStream[String] = environment.fromCollection(Array("hello world","spark flink"))val secondStream: DataStream[Int] = environment.fromCollection(Array(1,2,3,4))//调用connect方法连接多个DataStreamval connectStream: ConnectedStreams[String, Int] = firstStream.connect(secondStream)val unionStream: DataStream[Any] = connectStream.map(x => x + "abc",y => y*2)val coFlatMapStream: DataStream[String] = connectStream.flatMap(new CoFlatMapFunction[String, Int, String] {//对第一个流中的数据操作override def flatMap1(value: String, out: Collector[String]): Unit = {out.collect(value.toUpperCase())}//对第二个流中的数据操作override def flatMap2(value: Int, out: Collector[String]): Unit = {out.collect( value * 2 + "")}})unionStream.print()coFlatMapStream.print()environment.execute()}
}
4.6 split、select
  • 根据规则把一个数据流切分为多个流
package com.kaikeba.demo3/***  根据规则把一个数据流切分为多个流应用场景:* 可能在实际工作中,源数据流中混合了多种类似的数据,多种类型的数据处理规则不一样,所以就可以在根据一定的规则,* 把一个数据流切分成多个数据流,这样每个数据流就可以使用不同的处理逻辑了*/
import java.{lang, util}import org.apache.flink.streaming.api.collector.selector.OutputSelector
import org.apache.flink.streaming.api.scala.{DataStream, SplitStream, StreamExecutionEnvironment}object SplitAndSelect {def main(arg	s: Array[String]): Unit = {val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironmentenvironment.setParallelism(1)import org.apache.flink.api.scala._//构建DataStreamval firstStream: DataStream[String] = environment.fromCollection(Array("hadoop hive","spark flink"))val selectStream: SplitStream[String] = firstStream.split(new OutputSelector[String] {override def select(value: String): lang.Iterable[String] = {var list = new util.ArrayList[String]()//如果包含hello字符串if (value.contains("hadoop")) {//存放到一个叫做first的stream里面去list.add("first")}else{//否则存放到一个叫做second的stream里面去list.add("second")}list}})//获取first这个streamselectStream.select("first").print()environment.execute()}
}
4.7 重分区算子
  • 重算子允许我们对数据进行重新分区,或者解决数据倾斜等问题
    • Random Partitioning

      • 随机分区
        • 根据均匀分布随机分配元素(类似于random.nextInt(5),0 - 5 在概率上是均匀的)
        • dataStream.shuffle()
    • Rebalancing

      • 均匀分区
        • 分区元素循环,每个分区创建相等的负载。数据发生倾斜的时候可以用于性能优化。
        • 对数据集进行再平衡,重分区,消除数据倾斜
        • dataStream.rebalance()
    • Rescaling:

      • 跟rebalance有点类似,但不是全局的,这种方式仅发生在一个单一的节点,因此没有跨网络的数据传输。

        • dataStream.rescale()
    • Custom partitioning:自定义分区

      • 自定义分区需要实现Partitioner接口
        • dataStream.partitionCustom(partitioner, “someKey”)
        • 或者dataStream.partitionCustom(partitioner, 0);
    • Broadcasting:广播变量,后面详细讲解

4.7.1 对filter之后的数据进行重新分区
package com.kaikeba.demo4import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}/***  对filter之后的数据进行重新分区*/
object FlinkPartition {def main(args: Array[String]): Unit = {val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironmentimport org.apache.flink.api.scala._val dataStream: DataStream[Int] = environment.fromCollection(1 to 100)val filterStream: DataStream[Int] = dataStream.filter(x => x>10)//.shuffle  //随机的重新分发数据,上游的数据,随机的发送到下游的分区里面去//.rescale.rebalance //对数据重新进行分区,涉及到shuffle的过程val resultStream: DataStream[(Int, Int)] = filterStream.map(new RichMapFunction[Int, (Int, Int)] {override def map(value: Int): (Int, Int) = {//获取任务id,以及value(getRuntimeContext.getIndexOfThisSubtask, value)}})resultStream.print()environment.execute()}
}
4.7.2 自定义分区策略
  • 如果以上的几种分区方式还没法满足我们的需求,我们还可以自定义分区策略来实现数据的分区

  • 需求

    • 自定义分区策略,实现不同分区的数据发送到不同分区里面去进行处理,将包含hello的字符串发送到一个分区里面去,其他的发送到另外一个分区里面去
  • 定义分区类

    import org.apache.flink.api.common.functions.Partitionerclass MyPartitioner extends Partitioner[String]{override def partition(line: String, num: Int): Int = {println("分区个数为" +  num)if(line.contains("hello")){0}else{1}}
    }
    
  • 定义分区class类

    import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}object FlinkCustomerPartition {def main(args: Array[String]): Unit = {val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironmentimport  org.apache.flink.api.scala._//获取dataStreamval sourceStream: DataStream[String] = environment.fromElements("hello laowang","spark flink","hello tony","hive hadoop")val rePartition: DataStream[String] = sourceStream.partitionCustom(new MyPartitioner,x => x +"")rePartition.map(x =>{println("数据的key为" +  x + "线程为" + Thread.currentThread().getId)x})rePartition.print()environment.execute()}
    }
    

📖 5. DataSet 转换算子

DataSet官网转换算子操作:

https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/batch/index.html#dataset-transformations

  • Map

    • 输入一个元素,然后返回一个元素,中间可以做一些清洗转换等操作
  • FlatMap

    • 输入一个元素,可以返回零个,一个或者多个元素
  • MapPartition

    • 类似map,一次处理一个分区的数据【如果在进行map处理的时候需要获取第三方资源链接,建议使用MapPartition】
  • Filter

    • 过滤函数,对传入的数据进行判断,符合条件的数据会被留下
  • Reduce

    • 对数据进行聚合操作,结合当前元素和上一次reduce返回的值进行聚合操作,然后返回一个新的值
  • Aggregate

    • sum、max、min等
  • Distinct

    • 返回一个数据集中去重之后的元素,data.distinct()
  • Join

    • 内连接
  • OuterJoin

    • 外链接
  • Cross

    • 获取两个数据集的笛卡尔积
  • Union

    • 返回两个数据集的总和,数据类型需要一致
  • First-n

    • 获取集合中的前N个元素
  • Sort Partition

    • 在本地对数据集的所有分区进行排序,通过sortPartition()的链接调用来完成对多个字段的排序
5.1 mapPartition
package com.kaikeba.demo5import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment}import scala.collection.mutable.ArrayBufferobject MapPartitionDataSet {def main(args: Array[String]): Unit = {val environment: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironmentimport org.apache.flink.api.scala._val arrayBuffer = new ArrayBuffer[String]()arrayBuffer.+=("hello world1")arrayBuffer.+=("hello world2")arrayBuffer.+=("hello world3")arrayBuffer.+=("hello world4")val collectionDataSet: DataSet[String] = environment.fromCollection(arrayBuffer)val resultPartition: DataSet[String] = collectionDataSet.mapPartition(eachPartition => {eachPartition.map(eachLine => {val returnValue = eachLine + " result"returnValue})})resultPartition.print()}
}
5.2 distinct
package com.kaikeba.demo5import org.apache.flink.api.scala.ExecutionEnvironmentimport scala.collection.mutable.ArrayBufferobject DistinctDataSet {def main(args: Array[String]): Unit = {val environment: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironmentimport org.apache.flink.api.scala._val arrayBuffer = new ArrayBuffer[String]()arrayBuffer.+=("hello world1")arrayBuffer.+=("hello world2")arrayBuffer.+=("hello world3")arrayBuffer.+=("hello world4")val collectionDataSet: DataSet[String] = environment.fromCollection(arrayBuffer)val dsDataSet: DataSet[String] = collectionDataSet.flatMap(x => x.split(" ")).distinct()dsDataSet.print()}
}
5.3 join
package com.kaikeba.demo5import org.apache.flink.api.scala.ExecutionEnvironmentimport scala.collection.mutable.ArrayBufferobject JoinDataSet {def main(args: Array[String]): Unit = {val environment: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironmentimport org.apache.flink.api.scala._val array1 = ArrayBuffer((1,"张三"),(2,"李四"),(3,"王五"))val array2 =ArrayBuffer((1,"18"),(2,"35"),(3,"42"))val firstDataStream: DataSet[(Int, String)] = environment.fromCollection(array1)val secondDataStream: DataSet[(Int, String)] = environment.fromCollection(array2)val joinResult: UnfinishedJoinOperation[(Int, String), (Int, String)] = firstDataStream.join(secondDataStream)//where指定左边流关联的字段 ,equalTo指定与右边流相同的字段val resultDataSet: DataSet[(Int, String, String)] = joinResult.where(0).equalTo(0).map(x => {(x._1._1, x._1._2, x._2._2)})resultDataSet.print()}
}
5.4 leftOuterJoin、rightOuterJoin
package com.kaikeba.demo5import org.apache.flink.api.common.functions.JoinFunction
import org.apache.flink.api.scala.ExecutionEnvironmentimport scala.collection.mutable.ArrayBufferobject OutJoinDataSet {def main(args: Array[String]): Unit = {val environment: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironmentimport org.apache.flink.api.scala._val array1 = ArrayBuffer((1,"张三"),(2,"李四"),(3,"王五"),(4,"张飞"))val array2 =ArrayBuffer((1,"18"),(2,"35"),(3,"42"),(5,"50"))val firstDataStream: DataSet[(Int, String)] = environment.fromCollection(array1)val secondDataStream: DataSet[(Int, String)] = environment.fromCollection(array2)//左外连接val leftOuterJoin: UnfinishedOuterJoinOperation[(Int, String), (Int, String)] = firstDataStream.leftOuterJoin(secondDataStream)//where指定左边流关联的字段 ,equalTo指定与右边流相同的字段val leftDataSet: JoinFunctionAssigner[(Int, String), (Int, String)] = leftOuterJoin.where(0).equalTo(0)//对关联的数据进行函数操作val leftResult: DataSet[(Int, String,String)] = leftDataSet.apply(new JoinFunction[(Int, String), (Int, String), (Int,String, String)] {override def join(left: (Int, String), right: (Int, String)): (Int, String, String) = {val result = if (right == null) {Tuple3[Int, String, String](left._1, left._2, "null")} else {Tuple3[Int, String, String](left._1, left._2, right._2)}result}})leftResult.print()//右外连接val rightOuterJoin: UnfinishedOuterJoinOperation[(Int, String), (Int, String)] = firstDataStream.rightOuterJoin(secondDataStream)//where指定左边流关联的字段 ,equalTo指定与右边流相同的字段val rightDataSet: JoinFunctionAssigner[(Int, String), (Int, String)] = rightOuterJoin.where(0).equalTo(0)//对关联的数据进行函数操作val rightResult: DataSet[(Int, String,String)] = rightDataSet.apply(new JoinFunction[(Int, String), (Int, String), (Int,String, String)] {override def join(left: (Int, String), right: (Int, String)): (Int, String, String) = {val result = if (left == null) {Tuple3[Int, String, String](right._1, right._2, "null")} else {Tuple3[Int, String, String](right._1, right._2, left._2)}result}})rightResult.print()}
}
5.5 cross
package com.kaikeba.demo5import org.apache.flink.api.scala.ExecutionEnvironmentimport scala.collection.mutable.ArrayBufferobject CrossJoinDataSet {def main(args: Array[String]): Unit = {val environment: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironmentimport org.apache.flink.api.scala._val array1 = ArrayBuffer((1,"张三"),(2,"李四"),(3,"王五"),(4,"张飞"))val array2 =ArrayBuffer((1,"18"),(2,"35"),(3,"42"),(5,"50"))val firstDataStream: DataSet[(Int, String)] = environment.fromCollection(array1)val secondDataStream: DataSet[(Int, String)] = environment.fromCollection(array2)//cross笛卡尔积val crossDataSet: CrossDataSet[(Int, String), (Int, String)] = firstDataStream.cross(secondDataStream)crossDataSet.print()}
}
5.6 first-n 和 sortPartition
package com.kaikeba.demo5import org.apache.flink.api.common.operators.Order
import org.apache.flink.api.scala.ExecutionEnvironmentimport scala.collection.mutable.ArrayBufferobject TopNAndPartition {def main(args: Array[String]): Unit = {val environment: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironmentimport org.apache.flink.api.scala._//数组val array = ArrayBuffer((1,"张三",10),(2,"李四",20),(3,"王五",30),(3,"赵6",40))val collectionDataSet: DataSet[(Int, String,Int)] = environment.fromCollection(array)//获取前3个元素collectionDataSet.first(3).print()collectionDataSet.groupBy(0) //按照第一个字段进行分组.sortGroup(2,Order.DESCENDING)  //按照第三个字段进行排序.first(1)  //获取每组的前一个元素.print()/*** 不分组排序,针对所有元素进行排序,第一个元素降序,第三个元素升序*/collectionDataSet.sortPartition(0,Order.DESCENDING).sortPartition(2,Order.ASCENDING).print()}
}
5.7 partition分区算子
package com.kaikeba.demo5import org.apache.flink.api.scala.ExecutionEnvironmentimport scala.collection.mutable.ArrayBufferobject PartitionDataSet {def main(args: Array[String]): Unit = {val environment: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironmentimport org.apache.flink.api.scala._val array = ArrayBuffer((1,"hello"),(2,"hello"),(2,"hello"),(3,"hello"),(3,"hello"),(3,"hello"),(4,"hello"),(4,"hello"),(4,"hello"),(4,"hello"),(5,"hello"),(5,"hello"),(5,"hello"),(5,"hello"),(5,"hello"),(6,"hello"),(6,"hello"),(6,"hello"),(6,"hello"),(6,"hello"),(6,"hello"))environment.setParallelism(2)val sourceDataSet: DataSet[(Int, String)] = environment.fromCollection(array)//partitionByHash:按照指定的字段hashPartitioner分区sourceDataSet.partitionByHash(0).mapPartition(eachPartition => {eachPartition.foreach(t=>{println("当前线程ID为" + Thread.currentThread().getId +"============="+t._1)})eachPartition}).print()//partitionByRange:按照指定的字段进行范围分区sourceDataSet.partitionByRange(x => x._1).mapPartition(eachPartition =>{eachPartition.foreach(t=>{
println("当前线程ID为" + Thread.currentThread().getId +"============="+t._1)})eachPartition}).print()}
}

📖 6. Flink的dataSet connector介绍

查看官网
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/batch/connectors.html
6.1 文件系统connector
  • 为了从文件系统读取数据,Flink内置了对以下文件系统的支持:
文件系统Schema备注
HDFShdfs://Hdfs文件系统
S3s3://通过hadoop文件系统实现支持
MapRmaprfs://需要用户添加jar
Alluxioalluxio://通过hadoop文件系统实现
  • 注意
    • Flink允许用户使用实现org.apache.hadoop.fs.FileSystem接口的任何文件系统。例如S3、 Google Cloud Storage Connector for Hadoop、 Alluxio、 XtreemFS、 FTP等各种文件系统
Flink与Apache Hadoop MapReduce接口兼容,因此允许重用Hadoop MapReduce实现的代码:使用Hadoop Writable data type
使用任何Hadoop InputFormat作为DataSource(flink内置HadoopInputFormat)
使用任何Hadoop OutputFormat作为DataSink(flink内置HadoopOutputFormat)
使用Hadoop Mapper作为FlatMapFunction
使用Hadoop Reducer作为GroupReduceFunction
6.2 Flink集成Hbase之数据读取

Flink也可以直接与hbase进行集成,将hbase作为Flink的source和sink等

  • 第一步:创建hbase表并插入数据
create 'hbasesource','f1'
put 'hbasesource','0001','f1:name','zhangsan'
put 'hbasesource','0001','f1:age','18'
  • 第二步:导入整合jar包
<repositories><repository><id>cloudera</id><url>https://repository.cloudera.com/artifactory/cloudera-repos/</url></repository>
</repositories><dependency><groupId>org.apache.flink</groupId><artifactId>flink-hadoop-compatibility_2.11</artifactId><version>1.9.2</version>
</dependency>
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-shaded-hadoop2</artifactId>
<!-- 暂时没有1.9.2这个版本 --><version>1.7.2</version>
</dependency>
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-hbase_2.11</artifactId><version>1.9.2</version>
</dependency>
<dependency><groupId>org.apache.hbase</groupId><artifactId>hbase-client</artifactId><version>1.2.0-cdh5.14.2</version>
</dependency><dependency><groupId>org.apache.hbase</groupId><artifactId>hbase-server</artifactId><version>1.2.0-cdh5.14.2</version>
</dependency>
  • 第三步:开发flink集成hbase读取hbase数据
package com.kaikeba.demo6import org.apache.flink.addons.hbase.TableInputFormat
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.configuration.Configuration
import org.apache.hadoop.hbase.{Cell, HBaseConfiguration, HConstants, TableName}
import org.apache.hadoop.hbase.client._
import org.apache.hadoop.hbase.util.Bytes
import org.apache.flink.api.java.tuple/*** flink从hbase表中读取数据*/
object FlinkReadHBase {def main(args: Array[String]): Unit = {//获取批处理的环境val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironmentimport org.apache.flink.api.scala._//通过InputFormat添加数据源val hbaseDataSet=env.createInput(new TableInputFormat[tuple.Tuple2[String, String]] {//初始化配置方法override def configure(parameters: Configuration): Unit = {val conf = HBaseConfiguration.create();conf.set(HConstants.ZOOKEEPER_QUORUM, "node01,node02,node03")conf.set(HConstants.ZOOKEEPER_CLIENT_PORT, "2181")val conn: Connection = ConnectionFactory.createConnection(conf)table = classOf[HTable].cast(conn.getTable(TableName.valueOf("hbasesource")))scan = new Scan() {addFamily(Bytes.toBytes("f1"))}}override def getTableName: String = {"hbasesource"}override def getScanner: Scan = {scan}//封装hbase表数据override def mapResultToTuple(result: Result): tuple.Tuple2[String, String]  = {//获取rowkeyval rowkey: String = Bytes.toString(result.getRow)val rawCells: Array[Cell] = result.rawCells()val sb = new StringBuffer()for (cell <- rawCells) {val value = Bytes.toString(cell.getValueArray, cell.getValueOffset, cell.getValueLength)sb.append(value).append(",")}val valueString = sb.replace(sb.length() - 1, sb.length(), "").toStringval tuple2 = new org.apache.flink.api.java.tuple.Tuple2[String, String]//给元素的下标赋值tuple2.setField(rowkey, 0)tuple2.setField(valueString, 1)tuple2}})hbaseDataSet.print()}
}
6.3 Flink读取数据,然后写入hbase

Flink也可以集成Hbase实现将数据写入到Hbase里面去

  1. 第一种:实现OutputFormat接口

  2. 第二种:继承RichSinkFunction重写父类方法

package com.kaikeba.demo6import java.utilimport org.apache.flink.api.common.io.OutputFormat
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.configuration.Configuration
import org.apache.hadoop.hbase.client._
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.{HBaseConfiguration, HConstants, TableName}/*** flink写数据到hbase表中*/
object FlinkWriteHBase {def main(args: Array[String]): Unit = {val environment: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironmentimport org.apache.flink.api.scala._//准备数据val sourceDataSet: DataSet[String] = environment.fromElements("0002,lisi,28","0003,wangwu,30")//使用OutputFormat接口,写数据到hbase表中sourceDataSet.output(new HBaseOutputFormat)environment.execute()}
}//定义OutputFormat接口class HBaseOutputFormat extends OutputFormat[String]{val zkServer = "node01,node02,node03"val port = "2181"var conn: Connection = nulloverride def configure(parameters: Configuration): Unit = {}override def open(taskNumber: Int, numTasks: Int): Unit = {val config: org.apache.hadoop.conf.Configuration = HBaseConfiguration.createconfig.set(HConstants.ZOOKEEPER_QUORUM, zkServer)config.set(HConstants.ZOOKEEPER_CLIENT_PORT, port)config.setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 30000)config.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 30000)conn = ConnectionFactory.createConnection(config)}//写数据的方法override def writeRecord(record: String): Unit ={val tableName: TableName = TableName.valueOf("hbasesource")val cf1 = "f1"val array: Array[String] = record.split(",")val put: Put = new Put(Bytes.toBytes(array(0)))put.addColumn(Bytes.toBytes(cf1), Bytes.toBytes("name"), Bytes.toBytes(array(1)))put.addColumn(Bytes.toBytes(cf1), Bytes.toBytes("age"), Bytes.toBytes(array(2)))val puts = new util.ArrayList[Put]()puts.add(put)//设置缓存1m,当达到1m时数据会自动刷到hbaseval params: BufferedMutatorParams = new BufferedMutatorParams(tableName)//设置缓存的大小params.writeBufferSize(1024 * 1024)val mutator: BufferedMutator = conn.getBufferedMutator(params)mutator.mutate(puts)mutator.flush()puts.clear()}override def close(): Unit ={if(null != conn){conn.close()}}}

📖 7. Flink之广播变量

  • 概念
	广播变量允许编程人员在每台机器上保持一个只读的缓存变量,而不是传送变量的副本给tasks,
广播变量创建后,它可以运行在集群中的任何function上,而不需要多次传递给集群节点。另外需要记住,不应该修改广播变量,这样才能确保每个节点获取到的值都是一致的一句话解释,可以理解为是一个公共的共享变量,我们可以把一个dataset 数据集广播出去,然后不同的task在节点上都能够获取到,这个数据在每个节点上只会存在一份。如果不使用broadcast,则在每个节点中的每个task中都需要拷贝一份dataset数据集,比较浪费内存(也就是一个节点中可能会存在多份dataset数据)。
  • 用法
1):初始化数据
DataSet<Integer> toBroadcast = env.fromElements(1, 2, 3)2):广播数据
.withBroadcastSet(toBroadcast, "broadcastSetName");3):获取数据
Collection<Integer> broadcastSet = getRuntimeContext().getBroadcastVariable("broadcastSetName");注意:
a:广播出去的变量存在于每个节点的内存中,所以这个数据集不能太大。因为广播出去的数据,会常驻内存,除非程序执行结束
b:广播变量在初始化广播出去以后不支持修改,这样才能保证每个节点的数据都是一致的。
  • 案例
package com.kaikeba.demo6import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.configuration.Configurationimport scala.collection.mutable.ArrayBuffer/*** flink广播变量使用案例*/
object FlinkBroadCast {def main(args: Array[String]): Unit = {val environment: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironmentimport org.apache.flink.api.scala._//准备数据集val userInfo =ArrayBuffer(("zs", 10),("ls", 20),("ww", 30))//加载数据集构建DataSet--需要广播的数据val userDataSet: DataSet[(String, Int)] = environment.fromCollection(userInfo)//原始数据val data = environment.fromElements("zs","ls","ww")//在这里需要使用到RichMapFunction获取广播变量val result = data.map(new RichMapFunction[String,String] {//定义一个list集合,用户接受open方法中获取到的广播变量var listData: java.util.List[(String,Int)] = null//定义一个map集合,存储广播变量中的内容var allMap  = Map[String,Int]()//初始化方法  可以在open方法中获取广播变量数据override def open(parameters: Configuration): Unit ={//获取广播变量(broadcastMapName)的值listData= getRuntimeContext.getBroadcastVariable[(String,Int)]("broadcastMapName")val it = listData.iterator()while (it.hasNext){val tuple = it.next()allMap +=(tuple._1 -> tuple._2)}}//使用广播变量操作数据override def map(name: String): String = {val age = allMap.getOrElse(name,20)name+","+age}}).withBroadcastSet(userDataSet,"broadcastMapName")result.print()}
}

📖 8. Flink之Counter(计数器/累加器)

  • 概念
	Accumulator即累加器,与Mapreduce counter的应用场景差不多,都能很好地观察task在运行期间的数据变化,可以在Flink job任务中的算子函数中操作累加器,但是只能在任务执行结束之后才能获得累加器的最终结果。
Counter是一个具体的累加器(Accumulator)实现IntCounter, LongCounter 和 DoubleCounter
  • 用法
(1):创建累加器
val counter=new IntCounter()(2):注册累加器
getRuntimeContext.addAccumulator("num-lines",counter)(3):使用累加器
counter.add(1)(4):获取累加器的结果
myJobExecutionResult.getAccumulatorResult("num-lines")
  • 案例
    • 需求
      • 通过计数器来实现统计文件当中Exception关键字出现的次数
package com.kaikeba.demo6import org.apache.flink.api.common.accumulators.LongCounter
import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.configuration.Configuration/*** 通过计数器来实现统计文件当中Exception关键字出现的次数*/
object FlinkCounterAndAccumulator {def main(args: Array[String]): Unit = {val env=ExecutionEnvironment.getExecutionEnvironmentimport org.apache.flink.api.scala._//统计tomcat日志当中exception关键字出现了多少次val sourceDataSet: DataSet[String] = env.readTextFile("E:\\catalina.out")sourceDataSet.map(new RichMapFunction[String,String] {//创建累加器var counter=new LongCounter()override def open(parameters: Configuration): Unit = {//注册累加器getRuntimeContext.addAccumulator("my-accumulator",counter)}//实现业务逻辑override def map(value: String): String = {if(value.toLowerCase().contains("exception")){//满足条件累加器加1counter.add(1)}value}}).writeAsText("E:\\test")val job=env.execute()//获取累加器,并打印累加器的值val count=job.getAccumulatorResult[Long]("my-accumulator")//打印println(count)}}

📖 9. 分布式缓存

  • 概念
	Flink提供了一个类似于hadoop分布式缓存,可以使用户在并行函数中很方便的读取本地文件。
前面讲到的广播变量是将一些共享的数据放在TaskManager内存中,而Distribute cache是从外部加载一个文件/目录(例如hdfs),然后分别复制到每一个TaskManager的本地磁盘中。
  • 用法
(1):使用Flink运行环境调用registerCachedFile注册一个分布式缓存
env.registerCachedFile("hdfs:///path/to/your/file", "hdfsFile")  (2): 获取分布式缓存
File myFile = getRuntimeContext().getDistributedCache().getFile("hdfsFile");
  • 案例
package com.kaikeba.demo6import java.utilimport org.apache.commons.io.FileUtils
import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.configuration.Configurationimport scala.io.Source/*** flink的分布式缓存使用*/
object FlinkDistributedCache {def main(args: Array[String]): Unit = {val env = ExecutionEnvironment.getExecutionEnvironmentimport org.apache.flink.api.scala._//准备数据集val scoreDataSet  = env.fromElements((1, "语文", 50),(2, "数学", 60), (3, "英文", 80))//todo:1、注册分布式缓存文件env.registerCachedFile("E:\\distribute_cache_student.txt","student")//对成绩数据集进行map转换,将(学生ID, 学科, 分数)转换为(学生姓名,学科,分数)val result: DataSet[(String, String, Int)] = scoreDataSet.map(new RichMapFunction[(Int, String, Int), (String, String, Int)] {var list: List[(Int, String)] = _//初始化方法override def open(parameters: Configuration): Unit = {//获取分布式缓存的文件val file = getRuntimeContext.getDistributedCache.getFile("student")//获取文件的内容import scala.collection.JavaConverters._val listData: List[String] = FileUtils.readLines(file).asScala.toList//将文本转换为元组(学生ID,学生姓名)list = listData.map {line =>{val array = line.split(",")(array(0).toInt, array(1))}}}//在map方法中使用分布式缓存数据进行转换override def map(value: (Int, String, Int)): (String, String, Int) = {//获取学生idval studentId: Int = value._1val studentName: String = list.filter(x => studentId == x._1)(0)._2//封装结果返回// 将成绩数据(学生ID,学科,成绩) -> (学生姓名,学科,成绩)(studentName, value._2, value._3)}})result.print()}
}

📖 10 Flink的task之间传输数据方式以及Operator Chain

10.1 数据传输的方式
  • forward strategy

    • 转发策略
    (1) 一个 task 的输出只发送给一个 task 作为输入
    (2) 如果两个 task 都在一个 JVM 中的话,那么就可以避免网络开销
    

在这里插入图片描述

  • key-based strategy

    • 基于键的策略
    (1)数据需要按照某个属性(我们称为 key)进行分组(或者说分区)
    (2)相同key的数据需要传输给同一个task,在一个task中进行处理
    

在这里插入图片描述

  • broadcast strategy

    • 广播策略
    (1)在该情况下,一个数据集不动,另一个数据集会copy到有第一个数据集部分数据的所有机器上。如果使用小数据集与大数据集进行join,可以选择broadcast-forward策略,将小数据集广播,避免代价高的重分区。
    

在这里插入图片描述

  • random strategy

    • 随机策略
    (1)数据随机的从一个task中传输给下一个operator所有的subtask
    (2)保证数据能均匀的传输给所有的subtask,以便在任务之间均匀地分配负载
    

在这里插入图片描述

PS:

转发与随机策略是基于key-based策略的;转发策略和随机策略也可以看作是基于键的策略的变体,其中前者保存上游元组的键,而后者执行键的随机重新分配。

10.2 Operator Chain
  • 概念

    ​ operator chain是指将满足一定条件的operator链在一起,放在同一个task里面执行,是Flink任务优化的一种方式,在同一个task里面的operator的数据传输变成函数调用关系,它能减少线程之间的切换,减少消息的序列化/反序列化,减少数据在缓冲区的交换,减少了延迟的同时提高整体的吞吐量。

    ​ 常见的chain,例如:source->map->filter,这样的任务链可以chain在一起,那么其内部是如何决定是否能够chain在一起的呢?

  • Operator Chain的条件

    (1) 数据传输策略是 forward strategy
    (2) 在同一个TaskManager中运行(3) 上下游task的并行度相同
    
  • 在我们的单词技术统计程序当中,设置对应的并行度,便会发生operator chain这个动作了
    在这里插入图片描述
    在这里插入图片描述
    在这里插入图片描述

在这里插入图片描述
在这里插入图片描述

⭐️把所有的例子都敲一遍

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com