您的位置:首页 > 游戏 > 手游 > SparkStreaming编程-DStream创建

SparkStreaming编程-DStream创建

2024/12/27 13:19:34 来源:https://blog.csdn.net/weixin_44872470/article/details/139409000  浏览:    关键词:SparkStreaming编程-DStream创建
Spark Streaming 原生支持一些不同的数据源。

RDD队列

用法说明

通过使用ssc.queueStream(queueOfRDDs)来创建DStream,
每一个推送到这个队列中的RDD,都会作为一个DStream处理。

案例实操

/** 
需求:循环创建几个 RDD,将 RDD 放入队列。
通过 Spark Streaming创建 Dstream,计算 WordCount
*/import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.{Seconds, StreamingContext}
import scala.collection.mutableobject RDDQueueDemo {def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("RDDQueueDemo").setMaster("local[*]")// 创建SparkStreaming入口对象,且设置5s为时间间隔val scc = new StreamingContext(conf, Seconds(5))// 根据SparkStreaming入口对象创建Spark对象val sc = scc.sparkContext// 创建一个可变队列val queue: mutable.Queue[RDD[Int]] = mutable.Queue[RDD[Int]]()val rddDS: InputDStream[Int] = scc.queueStream(queue, true)rddDS.reduce(_ + _).print// 开始接收数据并计算scc.start// 循环的方式向队列中添加 RDDfor (elem <- 1 to 5) {queue += sc.parallelize(1 to 100)Thread.sleep(2000)}// 等待计算结束(要么手动退出,要么出现异常)才退出主程序scc.awaitTermination()}
}

自定义数据源

使用及说明

其实就是自定义接收器
需要继承Receiver,并实现onStart、onStop方法来自定义数据源采集。

案例实操

自定义数据源,实现监控某个端口号,获取该端口号内容。
自定义数据源
object MySource{def apply(host: String, port: Int): MySource = new MySource(host, port)
}class MySource(host: String, port: Int) extends Receiver[String](StorageLevel.MEMORY_ONLY){/*接收器启动的时候调用该方法. This function must initialize all resources (threads, buffers, etc.) necessary for receiving data.这个函数内部必须初始化一些读取数据必须的资源该方法不能阻塞, 所以 读取数据要在一个新的线程中进行.*/override def onStart(): Unit = {// 启动一个新的线程来接收数据new Thread("Socket Receiver"){override def run(): Unit = {receive()}}.start()}// 此方法用来接收数据def receive()={val socket = new Socket(host, port)val reader = new BufferedReader(new InputStreamReader(socket.getInputStream, StandardCharsets.UTF_8))var line: String = null// 当 receiver没有关闭, 且reader读取到了数据则循环发送给sparkwhile (!isStopped && (line = reader.readLine()) != null ){// 发送给sparkstore(line)}// 循环结束, 则关闭资源reader.close()socket.close()// 重启任务restart("Trying to connect again")}override def onStop(): Unit = {}
}
使用自定义数据源
object MySourceDemo {def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("StreamingWordCount").setMaster("local[*]")// 1. 创建SparkStreaming的入口对象: StreamingContext  参数2: 表示事件间隔val ssc = new StreamingContext(conf, Seconds(5))// 2. 创建一个DStreamval lines: ReceiverInputDStream[String] = ssc.receiverStream[String](MySource("hadoop201", 9999))// 3. 一个个的单词val words: DStream[String] = lines.flatMap(_.split("""\s+"""))// 4. 单词形成元组val wordAndOne: DStream[(String, Int)] = words.map((_, 1))// 5. 统计单词的个数val count: DStream[(String, Int)] = wordAndOne.reduceByKey(_ + _)//6. 显示count.print//7. 启动流式任务开始计算ssc.start()//8. 等待计算结束才退出主程序ssc.awaitTermination()ssc.stop(false)}
}
开启端口
nc -lk 10000

Kafka数据源

用法及说明

在工程中需要引入 Maven 依赖 spark-streaming-kafka_2.11来使用它。
包内提供的 KafkaUtils 对象可以在 StreamingContext和JavaStreamingContext中以你的 Kafka 消息创建出 DStream。
两个核心类:KafkaUtils、KafkaCluster

导入依赖

<dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming-kafka-0-8_2.11</artifactId><version>2.1.1</version>
</dependency>

案例实操

高级API 1

import kafka.serializer.StringDecoder
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}object HighKafka {def main(args: Array[String]): Unit = {val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("HighKafka")val ssc = new StreamingContext(conf, Seconds(3))// kafka 参数//kafka参数声明val brokers = "hadoop201:9092,hadoop202:9092,hadoop203:9092"val topic = "first"val group = "bigdata"val deserialization = "org.apache.kafka.common.serialization.StringDeserializer"val kafkaParams = Map(ConsumerConfig.GROUP_ID_CONFIG -> group,ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> brokers,)val dStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, Set(topic))dStream.print()ssc.start()ssc.awaitTermination()}
}

高级API 2

import kafka.serializer.StringDecoder
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}object HighKafka2 {def createSSC(): StreamingContext = {val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("HighKafka")val ssc = new StreamingContext(conf, Seconds(3))// 偏移量保存在 checkpoint 中, 可以从上次的位置接着消费ssc.checkpoint("./ck1")// kafka 参数//kafka参数声明val brokers = "hadoop201:9092,hadoop202:9092,hadoop203:9092"val topic = "first"val group = "bigdata"val deserialization = "org.apache.kafka.common.serialization.StringDeserializer"val kafkaParams = Map("zookeeper.connect" -> "hadoop201:2181,hadoop202:2181,hadoop203:2181",ConsumerConfig.GROUP_ID_CONFIG -> group,ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> brokers,ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> deserialization,ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> deserialization)val dStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, Set(topic))dStream.print()ssc}def main(args: Array[String]): Unit = {val ssc: StreamingContext = StreamingContext.getActiveOrCreate("./ck1", () => createSSC())ssc.start()ssc.awaitTermination()}
}

低级API

import kafka.common.TopicAndPartition
import kafka.message.MessageAndMetadata
import kafka.serializer.StringDecoder
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka.KafkaCluster.Err
import org.apache.spark.streaming.kafka.{HasOffsetRanges, KafkaCluster, KafkaUtils, OffsetRange}
import org.apache.spark.streaming.{Seconds, StreamingContext}object LowKafka {// 获取 offsetdef getOffset(kafkaCluster: KafkaCluster, group: String, topic: String): Map[TopicAndPartition, Long] = {// 最终要返回的 Mapvar topicAndPartition2Long: Map[TopicAndPartition, Long] = Map[TopicAndPartition, Long]()// 根据指定的主体获取分区信息val topicMetadataEither: Either[Err, Set[TopicAndPartition]] = kafkaCluster.getPartitions(Set(topic))// 判断分区是否存在if (topicMetadataEither.isRight) {// 不为空, 则取出分区信息val topicAndPartitions: Set[TopicAndPartition] = topicMetadataEither.right.get// 获取消费消费数据的进度val topicAndPartition2LongEither: Either[Err, Map[TopicAndPartition, Long]] =kafkaCluster.getConsumerOffsets(group, topicAndPartitions)// 如果没有消费进度, 表示第一次消费if (topicAndPartition2LongEither.isLeft) {// 遍历每个分区, 都从 0 开始消费topicAndPartitions.foreach {topicAndPartition => topicAndPartition2Long = topicAndPartition2Long + (topicAndPartition -> 0)}} else { // 如果分区有消费进度// 取出消费进度val current: Map[TopicAndPartition, Long] = topicAndPartition2LongEither.right.gettopicAndPartition2Long ++= current}}// 返回分区的消费进度topicAndPartition2Long}// 保存消费信息def saveOffset(kafkaCluster: KafkaCluster, group: String, dStream: InputDStream[String]) = {dStream.foreachRDD(rdd => {var map: Map[TopicAndPartition, Long] = Map[TopicAndPartition, Long]()// 把 RDD 转换成HasOffsetRanges对val hasOffsetRangs: HasOffsetRanges = rdd.asInstanceOf[HasOffsetRanges]// 得到 offsetRangsval ranges: Array[OffsetRange] = hasOffsetRangs.offsetRangesranges.foreach(range => {// 每个分区的最新的 offsetmap += range.topicAndPartition() -> range.untilOffset})kafkaCluster.setConsumerOffsets(group,map)})}def main(args: Array[String]): Unit = {val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("HighKafka")val ssc = new StreamingContext(conf, Seconds(3))// kafka 参数//kafka参数声明val brokers = "hadoop201:9092,hadoop202:9092,hadoop203:9092"val topic = "first"val group = "bigdata"val deserialization = "org.apache.kafka.common.serialization.StringDeserializer"val kafkaParams = Map("zookeeper.connect" -> "hadoop201:2181,hadoop202:2181,hadoop203:2181",ConsumerConfig.GROUP_ID_CONFIG -> group,ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> brokers,ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> deserialization,ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> deserialization)// 读取 offsetval kafkaCluster = new KafkaCluster(kafkaParams)val fromOffset: Map[TopicAndPartition, Long] = getOffset(kafkaCluster, group, topic)val dStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, String](ssc,kafkaParams,fromOffset,(message: MessageAndMetadata[String, String]) => message.message())dStream.print()// 保存 offsetsaveOffset(kafkaCluster, group, dStream)ssc.start()ssc.awaitTermination()}
}

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com