您的位置:首页 > 房产 > 建筑 > 详解 Spark 核心编程之累加器

详解 Spark 核心编程之累加器

2025/1/22 22:03:05 来源:https://blog.csdn.net/weixin_44480009/article/details/139387635  浏览:    关键词:详解 Spark 核心编程之累加器

累加器是分布式共享只写变量

一、累加器功能

​ 累加器可以用来把 Executor 端的变量信息聚合到 Driver 端。在 Driver 程序中定义的变量,在 Executor 端的每个 Task 都会得到这个变量的一份新的副本,每个 task 更新这些副本的值后,传回 Driver 端进行 merge

在这里插入图片描述

二、累加器类型

1. 系统累加器

/**
常见的系统累加器:longAccumulator/doubleAccumulator/collectionAccumulator
说明:累加器一般放在行动算子中进行操作
*/
object TestRDDAcc {def main(args: Array[String]): Unit = {val conf = new SparkConf().setMaster("local[*]").setAppName("Acc")val sc = new SparkContext(conf)val rdd = sc.makeRDD(List(1,2,3,4), 2)// 创建累加器val accSum = sc.longAccumulator("sum")rdd.foreach(num => {accSum.add(num)    })println(accSum.value)sc.stop()}
}

三、自定义累加器

自定义累加器实现 WordCount 案例,避免 shuffle 操作

/**1.继承 AccumulatorV2[IN, OUT] 抽象类,定义输入输出的泛型类型1.1 IN 表述累加器 add 的数据的类型1.2 OUT 表示累加器 value 的返回类型2.重写累加器的抽象方法
*/
object TestAccWordCount {def main(args: Array[String]): Unit = {val conf = new SparkConf().setMaster("local[*]").setAppName("WCAcc")val sc = new SparkContext(conf)val rdd = sc.makeRDD(List("hello", "hive", "hello", "spark"))// 创建自定义累加器val wcAcc = new MyAccumulator()// 向 spark 进行注册sc.register(wcAcc, "wordCountAcc")// 循环遍历 rddrdd.foreach(word => {// 使用累加器wcAcc.add(word)    })// 输出累加器的值println(wcAcc.value)sc.stop()}
}/*自定义累加器
*/
class MyAccumulator extends AccumulatorV2[String, mutable.Map[String, Long]] {// 定义累加器的返回结果 Mapprivate var resultMap = mutable.Map[String, Long]()// 判断是否为初始状态override def isZero: Boolean = resultMap.isEmpty()// 复制累加器override def copy(): AccumulatorV2[String, mutable.Map[String, Long]] = {this}// 重置累加器override def reset(): Unit = resultMap.clear()// 获取累加器输入的数据进行操作override def add(word: String): Unit = {// 向 resultMap 中添加新值或累加旧值val count = resultMap.getOrElse(word, 0L) + 1resultMap.update(word, count)}// 合并多个累加器的结果override def merge(other:  AccumulatorV2[String, mutable.Map[String, Long]]): Unit = {other.value.foreach({case (word, count) => {val newCount = this.resultMap.getOrElse(word, 0L) + 1this.resultMap.update(word, newCount)}})
}// 返回累加器的结果override def value: mutable.Map[String, Long] = resultMap}

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com