您的位置:首页 > 新闻 > 热点要闻 > 国内新闻最新消息10条简短2022_苏州网络推广企业_谷歌浏览器下载手机版_网站推广的方式有

国内新闻最新消息10条简短2022_苏州网络推广企业_谷歌浏览器下载手机版_网站推广的方式有

2024/12/28 17:44:24 来源:https://blog.csdn.net/2301_80912559/article/details/144011244  浏览:    关键词:国内新闻最新消息10条简短2022_苏州网络推广企业_谷歌浏览器下载手机版_网站推广的方式有
国内新闻最新消息10条简短2022_苏州网络推广企业_谷歌浏览器下载手机版_网站推广的方式有

1. 前言

由于大多数Spark计算的内存性质,Spark程序可能会受到集群中任何资源(CPU,网络带宽或内存)的瓶颈。通常,如果内存资源足够,则瓶颈是网络带宽。

数据序列化,这对于良好的网络性能至关重要。

在Spark的架构中,在网络中传递的或者缓存在内存、硬盘中的对象需要进行序列化操作。比如:

1)分发给Executor上的Task

2)广播变量

3)Shuffle过程中的数据缓存

等操作,序列化起到了重要的作用,将对象序列化为慢速格式或占用大量字节的格式将大大减慢计算速度。通常,这是优化Spark应用程序的第一件事。

spark 序列化分两种:一种是Java 序列化; 另一种是 Kryo 序列化

2. Java序列化

定义UserInfo类

public class UserInfo{private String name = "hainiu"; // java实现了序列化private int age = 10;  // java实现了序列化private Text addr = new Text("beijing");  // 没有实现java的 Serializable接口public UserInfo() {}@Overridepublic String toString() {return "UserInfo{" +"name='" + name + '\'' +", age=" + age +", addr=" + addr +'}';}
}

java实现序列化的一般方法:

1)让类实现Serializable接口

当使用Serializable方案的时候,你的对象必须继承Serializable接口,类中的属性如果有实例那也必须是继承Serializable 可序列化的;

package com.hainiu.sparkcore
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object SerDemo {def main(args: Array[String]): Unit = {val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SerDemo")val sc = new SparkContext(conf)val rdd: RDD[String] = sc.parallelize(List("aa","aa","bb","aa"),2)val broad: Broadcast[UserInfo] = sc.broadcast(new UserInfo)val pairRdd: RDD[(String, UserInfo)] = rdd.map(f => {val userInfo: UserInfo = broad.value(f, userInfo)})// 因为groupByKey有shuffle,需要序列化val groupRdd: RDD[(String, Iterable[UserInfo])] = pairRdd.groupByKey()val arr: Array[(String, Iterable[UserInfo])] = groupRdd.collect()for(t <- arr){println(t)}}
}

2)static和transient修饰的属性不会被序列化,可以通过在属性上加 static 或 transient 修饰来解决序列化问题。

static修饰的是类的状态,而不是对象状态,所以不存在序列化问题;

这样导致数据丢失。

给addr 属性用 transient 修饰,导致反序列化后数据丢失

java 序列化弊端:

1)如果引入第三方类对象作为属性,如果对象没有实现序列化,那这个类也不能序列化;

2)用 transient 修饰 的属性,反序列化后数据丢失;

3)Java序列化很灵活(支持所有对象的序列化)但性能较差,同时序列化后占用的字节数也较多(包含了序列化版本号、类名等信息);

3. Kryo 序列化

由于java序列化性能问题,spark 引入了Kryo序列化机制。

Spark 也推荐用 Kryo序列化机制。Kryo序列化机制比Java序列化机制性能提高10倍左右,Spark之所以没有默认使用Kryo作为序列化类库,是因为它不支持所有对象的序列化,同时Kryo需要用户在使用前注册需要序列化的类型,不够方便。

1)开启序列化

spark 默认序列化方式 是 用java序列化。

conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") // classOf[KryoSerializer].getName 一样效果

2)配置序列化参数

当开启序列化后,需要配置 【spark.kryo.registrationRequired】属性为true,默认是false,如果是false,Kryo序列化时性能有所下降。

注册有两种方式:

第一种:

    // 开启Kryo序列化conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")// 要求主动注册conf.set("spark.kryo.registrationRequired", "true")// 方案1:val classes: Array[Class[_]] = Array[Class[_]](classOf[UserInfo],classOf[Text],Class.forName("scala.reflect.ClassTag$GenericClassTag"),classOf[Array[UserInfo]])//将上面的类注册conf.registerKryoClasses(classes)

第二种:

封装一个自定义注册类,然后把自定义注册类注册给Kryo。

a)自定义注册类:

class MyRegistrator extends KryoRegistrator {override def registerClasses(kryo: Kryo): Unit = {kryo.register(classOf[UserInfo])kryo.register(classOf[Text])kryo.register(Class.forName("scala.reflect.ClassTag$GenericClassTag"))kryo.register(classOf[Array[UserInfo]])}
}

b)配置自定义注册类

    // 开启Kryo序列化conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")// 要求主动注册conf.set("spark.kryo.registrationRequired", "true")// 设置注册类conf.set("spark.kryo.registrator",classOf[MyRegistrator].getName)

代码:

package spark05import com.esotericsoftware.kryo.Kryo
import java05.UserInfo
import org.apache.hadoop.io.Text
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.serializer.KryoRegistrator
import org.apache.spark.{SparkConf, SparkContext}object SerDemo {def main(args: Array[String]): Unit = {val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SerDemo")// 开启Kryo序列化conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")// 要求主动注册conf.set("spark.kryo.registrationRequired", "true")// 方案1:
// val classes: Array[Class[_]] = Array[Class[_]](
// classOf[UserInfo],
// classOf[Text],
// Class.forName("scala.reflect.ClassTag$GenericClassTag"),
// classOf[Array[UserInfo]]
// )//将上面的类注册
// conf.registerKryoClasses(classes)// 方案2conf.set("spark.kryo.registrator",classOf[MyRegistrator].getName)val sc = new SparkContext(conf)val rdd: RDD[String] = sc.parallelize(List("aa","aa","bb","aa"),2)val user = new UserInfoval broad: Broadcast[UserInfo] = sc.broadcast(user)val rdd2: RDD[(String, UserInfo)] = rdd.map(f => {val user2: UserInfo = broad.value(f, user2)})// 目的是让rdd产生shuffleval arr: Array[(String, Iterable[UserInfo])] = rdd2.groupByKey().collect()arr.foreach(println)}
}class MyRegistrator extends KryoRegistrator {override def registerClasses(kryo: Kryo): Unit = {kryo.register(classOf[UserInfo])kryo.register(classOf[Text])kryo.register(Class.forName("scala.reflect.ClassTag$GenericClassTag"))kryo.register(classOf[Array[UserInfo]])}
}

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com