简介
Hadoop与Spark-SQL的对比
Hadoop在处理结构化数据方面存在局限性,无法有效处理某些类型的数据。
Spark应运而生,特别设计了处理结构化数据的模块,称为Spark SQL(原称Shark)。
SparkSQL的发展历程:
SparkSQL的前身Shark,它是伯克利实验室Spark生态组件之一,基于Hive 所开发的工具。Shark 的出现,使得 SQL-on-Hadoop 的性能比 Hive 有了 10-100 倍的提高。
随着Spark的发展,Shark逐渐被Spark SQL取代,Spark SQL不再依赖Hive,兼容性更强。
Spark SQL与Hadoop的关系
SparkSQL作为Spark生态的一员继续发展,不再受限于 Hive,只是兼容 Hive。 Spark 作为 Hive 的底层引擎之一,也就是说,Hive 将不再受限于一个引擎,可以采用 Map-Reduce、Tez、Spark 等引擎。
Spark-SQL的特点
整合性:无缝整合Spark查询和编程。
统一数据访问:使用相同方式连接不同数据源。
兼容性:兼容Hive,可在已有仓库上直接运行SQL和HQL。
标准数据连接:支持JDBC和ODBC连接。
DataFrame与DataSet
DataFrame:类似于表格,包含列名和行索引,适合结构化数据。
DataSet:强类型集合,必须指定数据类型。
将 DataFrame 转换为 DataSet。Row 是一个类型,跟 Car、Person 这些的类型一样,所有的表结构信息都用 Row 来表示。获取数据时需要指定顺序
核心编程
解决类加载报错问题:
遇到类加载报错的解决方法:
检查对象名和类名是否一致
确认代码编辑器没有卡顿,并且新建类时选择正确
设置代码为根目录
DataFrame
Spark SQL 的 DataFrame API 允许我们使用 DataFrame 而不用必须去注册临时表或者生成 SQL 表达式。DataFrame API 既有 transformation 操作也有 action 操作。
创建 DataFrame
在 Spark SQL 中 SparkSession 是创建 DataFrame 和执行 SQL 的入口,创建 DataFrame
有三种方式:通过 Spark 的数据源进行创建;
从一个存在的 RDD 进行转换;还可以从 Hive
Table 进行查询返回。
从 Spark 数据源进行创建
Spark-SQL支持的数据类型:
在 spark 的 bin/data 目录中创建 user.json 文件
{"username":"zhangsan","age":20}
{"username":"lisi","age":17}
读取 json 文件创建 DataFrame
val df = spark.read.json("data/user.json")(进入的是自己放入的目录)
需要先进入这个里面:
注意:如果从内存中获取数据,spark 可以知道数据类型具体是什么。如果是数字,默认作为 Int 处理;但是从文件中读取的数字,不能确定是什么类型,所以用 bigint 接收,可以和 Long 类型转换,但是和 Int 不能进行转换。
SQL 语法
SQL 语法风格是指我们查询数据的时候使用 SQL 语句来查询,这种风格的查询必须要
有临时视图或者全局视图来辅助
读取 JSON 文件创建 DataFrame
val df1 = spark.read.json("data/user.json")
对 DataFrame 创建一个临时表
df1.createOrReplaceTempView("people")
通过 SQL 语句实现查询全表
val sqlDF = spark.sql("select * from people")
结果展示
sqlDF.show
对于 DataFrame 创建一个全局表
df1.createGlobalTempView("people1")
(全局不要随便弄,怕跟自己之前弄过的名称重了,到时会报错)
第一次运行全局表会出现这个错误,需要将虚拟中的hive-site.xml文件复制到spark的conf路径下(最好能将整个hive目录放在本地文件系统中)
配置完成之后再执行语句:
df1.createGlobalTempView("people1")
通过 SQL 语句实现查询全表
spark.sql("SELECT * FROM global_temp.people1").show()
spark.newSession().sql("SELECT * FROM global_temp.people1").show()
DSL 语法
DataFrame 提供一个特定领域语言(domain-specific language, DSL)去管理结构化的数据。 可以在 Scala, Java, Python 和 R 中使用 DSL,使用 DSL 语法风格不必去创建临时视图了
创建一个 DataFrame
val df = spark.read.json("data/user.json")
查看 DataFrame 的 Schema 信息
df.printSchema
只查看"username"列数据
df.select("username").show()
查看"username"列数据以及"age+1"数据
注意:涉及到运算的时候, 每列都必须使用$, 或者采用引号表达式:单引号+字段名
df.select($"username",$"age" + 1).show
df.select('username, 'age + 1).show()
查看"age"大于"18"的数据
df.filter($"age">18).show
按照"age"分组,查看数据条数
df.groupBy("age").count.show
RDD 转换为 DataFrame
val idRDD = sc.textFile("data/id.txt")
idRDD.toDF("id").show
DataSet
创建 DataSet
使用样例类序列创建 DataSet
case class Person(name: String, age: Long)
val caseClassDS = Seq(Person("zhangsan",2)).toDS()
caseClassDS.show
使用基本类型的序列创建 DataSet
val ds = Seq(1,2,3,4,5).toDS
ds.show
注意:在实际使用的时候,很少用到把序列转换成DataSet,更多的是通过RDD来得到DataSet
RDD 转换为 DataSet
SparkSQL 能够自动将包含有 case 类的 RDD 转换成 DataSet,case 类定义了 table 的结 构,case 类属性通过反射变成了表的列名。Case 类可以包含诸如 Seq 或者 Array 等复杂的结构。
case class User(name:String, age:Int)
sc.makeRDD(List(("zhangsan",30), ("lisi",49))).map(t=>User(t._1, t._2)).toDS
DataSet 转换为 RDD
DataSet 其实也是对 RDD 的封装,所以可以直接获取内部的 RDD
case class User(name:String, age:Int)
sc.makeRDD(List(("zhangsan",30), ("lisi",49))).map(t=>User(t._1, t._2)).toDS
val rdd = res3.rdd(这个res后面跟的数是根据上一个代码出来的结果里的)
rdd.collect
DataFrame 转换为 DataSet
case class User(name:String, age:Int)
val df = sc.makeRDD(List(("zhangsan",30), ("lisi",49))).toDF("name","age")
val ds = df.as[User]
创建和转换数据集
使用样例类创建数据集:通过 case class
定义类和字段类型。
添加数据后,默认是 RDD,需转换为数据集。
使用基本类型创建数据集:直接使用 DataSet
创建数据集。
RDD 和数据集的转换:RDD 转数据集:使用 toDS()
方法。
数据集转 RDD:使用 rdd
属性。
RDD、DataFrame 和 DataSet 的关系
共性:
都是 Spark 平台下的分布式弹性数据集。
都有惰性机制,只有在行动算子触发时才执行。
都支持自动缓存和分区概念。
都可以使用模式匹配获取字段值和类型。
区别:
RDD 支持 MLLIB,常用于机器学习。
DataFrame 和 DataSet 支持 Spark SQL 操作。
DataFrame 每行类型固定为 ROW,不支持直接访问字段值。
DataSet 是强类型的,每行数据类型明确。
RDD、DataFrame 和 DataSet 的相互转换
RDD 转 DataFrame:使用 toDF()
方法。
DataFrame 转 RDD:使用 rdd
属性。
DataFrame 转 DataSet:使用 as[Type]
方法。
DataSet 转 DataFrame:使用 toDF()
方法。
RDD 转 DataSet:使用 toDS()
方法。