您的位置:首页 > 房产 > 建筑 > 新华社最新消息的新闻_企业网站制作的方法_河南疫情最新消息_上百度首页

新华社最新消息的新闻_企业网站制作的方法_河南疫情最新消息_上百度首页

2025/1/11 3:03:30 来源:https://blog.csdn.net/wusuoweiieq/article/details/144986439  浏览:    关键词:新华社最新消息的新闻_企业网站制作的方法_河南疫情最新消息_上百度首页
新华社最新消息的新闻_企业网站制作的方法_河南疫情最新消息_上百度首页

目录

  • 前置内容
    • SparkSQL和HIve区别
    • 数据抽象
  • DataFrame
    • 创建DF createDataFrame(RDD,DF结构)
    • 创建DF RDD.toDF(手动填入列名)
    • 创建DF toDF(样例类)
    • DF转RDD DF.rdd
  • DataSet
    • 创建DataSet 对象(任意类型).toDS
    • DataSet 和RDD转换
    • DataSet 和DataFrame转换
  • 读写文件
    • parquet
    • text
    • JSON
      • 报错提示(已整理至专题)
    • CSV
    • JDBC(Mysql)
  • 数据清洗
    • 数据去重dropDuplicates
    • 计算函数 functions
    • DSL(Domain Specific Language领域特定语言)
    • 自定义函数
      • UDF(User-Defined-Function)
        • 概述
        • 特殊返回值(以数组为例)
      • UDAF(User-Defined Aggregation Function)
  • 开窗函数

前置内容

Spark的一个模块,用于处理结构化数据

特点

  1. 整合SQL、Spark,支持java,scala
  2. 数据源、不同格式的文件的连接获取数据方式相同
  3. 兼容HIve,可以进行整合
  4. 支持JDBC、ODBC

SparkSQL和HIve区别

区别HiveSparkSQL
计算模型磁盘迭代内存迭代计算
元数据管理有元数据管理无元数据管理(但Spark本身有Catalog用于管理元数据)
底层运行框架MapReduceSparkRDD(现在主要使用DataFrame和Dataset API)
SQL支持支持SQL开发支持SQL开发
SQL混合代码开发不支持支持(PySpark, Scala等)
Yarn支持可以运行在Yarn上可以运行在Yarn上

数据抽象

特性/组件SparkCore - RDDSparkSQL - DataFrameSparkSQL - Dataset
引入版本Spark 1.0Spark 1.3Spark 1.6
数据结构无标准数据结构,存储任意类型数据二维表数据结构,每行类型为Row自定义数据结构,每行类型可指定(如case class)
数据访问直接访问存储的数据每列值需通过解析Row获取可直接访问每行各字段
与Spark MLlib关系一般与Spark MLlib同时使用--
SparkSQL支持不支持SparkSQL操作支持SparkSQL操作支持SparkSQL操作
备注-DataFrame是Dataset[Row]的特例提供更高层次的抽象和类型安全

DataFrame

创建DF createDataFrame(RDD,DF结构)

需要定义StructType对象,指定DF所有列名和各自的类型

package com.wunaiieqimport org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.apache.spark.sql.{DataFrame, Row, SparkSession}object CreateDataFrame {def main(args: Array[String]): Unit = {//1.创建上下文配置文件对象val conf = new SparkConf().setMaster("local[*]").setAppName("CreateDataFrame")//2.创建执行环境入口SparkSession对象/*
* 它的作用是检查当前SparkSession是否已经存在于SparkContext中(在Spark 2.x中,每个SparkContext最多只能有一个活跃的SparkSession)。
* 如果已经存在一个SparkSession,则 .getOrCreate() 方法会返回这个已存在的SparkSession实例。
* 如果不存在,它会根据之前通过 .config(conf) 方法设置的配置信息创建一个新的SparkSession实例,并返回这个新创建的实例。
* 这种设计允许Spark应用程序在需要时轻松地获取或创建一个SparkSession,而无需担心重复创建的问题。* */val sparkSession: SparkSession = SparkSession.builder().config(conf).getOrCreate()//3.读取文件,映射创建RDD[Row]对象val rdd: RDD[Row] = sparkSession.sparkContext.textFile("data/sql/student.txt").map(_.split(",")).map(array => Row(array(0).toInt, array(1).trim, array(2).toInt))//4.定义StructType对象,指定所有列名和各自的类型val schema: StructType = StructType(StructField("id", IntegerType, false) ::StructField("name", StringType, false) ::StructField("age", IntegerType, true) :: Nil)//5.基于rdd对象转为DataFrameval df: DataFrame = sparkSession.createDataFrame(rdd, schema)//6.打印df的表结构信息df.printSchema()//7.输出df中的数据df.show()//关闭sparkSession.stop()}
}

在这里插入图片描述

创建DF RDD.toDF(手动填入列名)

编写每个列的列名,将RDD中的数据依次填入

package com.wunaiieqimport org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SparkSession}object ToDFDemo1 {def main(args: Array[String]): Unit = {//1.创建配置文件对象val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("toDF")//2.创建SparkSession对象val sparkSession: SparkSession = SparkSession.builder().config(conf).getOrCreate()//4.添加隐式转换import sparkSession.implicits._//5.读取本地文件,并映射创建RDDval rdd: RDD[(Int, String, Int)] = sparkSession.sparkContext.textFile("data/sql/student.txt")//RDD[String]"1,tom,22"->RDD[Array[String]].map(_.split(","))//RDD[Array[String]]->RDD[(Int, String, Int)].map(arr => (arr(0).toInt, arr(1).trim, arr(2).toInt))//6.通过rdd.toDF(colNames: String*)//val df: DataFrame = rdd.toDF()//了解val df: DataFrame = rdd.toDF("id", "name", "age")//7.输出结构信息df.printSchema()//8.输出df中的数据/**show(numRows: Int, truncate: Boolean)* numRows:表示输出数据的行数,默认是20行.* truncate:表示输出时是否对列的值进行截取*   false:表示不截取*   true:表示截取,保留20个字符*///df.show()//df.show(2,false)df.show(10,false)//3.关闭sparksparkSession.stop()}
}

在这里插入图片描述

创建DF toDF(样例类)

创建一个scala的case class,作为DF的样例类
样例类:

package com.wunaiieq//定义样例类
case class Student(id:Int,name:String,age:Int)

ToDF示例

package com.wunaiieqimport org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SparkSession}object ToDFDemo2 {def main(args: Array[String]): Unit = {//1.创建配置文件对象val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("toDF2")//2.创建SparkSession对象val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()//4.添加隐式转换import spark.implicits._//5.读取本地文件,并创建RDD[Student]val rdd: RDD[Student] = spark.sparkContext.textFile("data/sql/student.txt").map(_.split(",")).map(arr => Student(arr(0).toInt, arr(1), arr(2).toInt))//6.通过rdd.toDF()转换为DataFrameval df: DataFrame = rdd.toDF()//7.输出df的结构信息和数据信息df.printSchema()df.show()//3.关闭sparkspark.stop()}
}

DF转RDD DF.rdd

DataFrame本质是对RDD的封装
将一个DataFrame转换为RDD时,得到的RDD中的每个元素是一个row对象,表示df的一行,允许使用索引等方式获取这个row中的某个值。
逻辑上来看,可以将返回后的RDD看作一个二维数组

package com.wunaiieqimport org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
object DataFrameToRdd {def main(args: Array[String]): Unit = {//1创建上下文环境配置对象val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("DataFrameToRdd")//2.创建执行环境入口对象SparkSession对象val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()//3.读取本地文本文件,映射并创建RDD[Row]对象val rdd: RDD[Student] = spark.sparkContext.textFile("data/sql/student.txt").map(_.split(",")).map(ele => Student(ele(0).toInt, ele(1).trim, ele(2).toInt))//4.添加spark隐式转换import spark.implicits._//5.将Rdd转换为DFval dataFrame: DataFrame = rdd.toDF()//B1.将dataFrame转换为rdd对象val rdd1: RDD[Row] = dataFrame.rdd//B2.通过collect()获取rdd1中的数据val array: Array[Row] = rdd1.collect()//B3.输出结果println(array(1))val name: Any = array(0)(1)println(name.toString)println(array(0).getAs[String]("name"))println(array(0).getString(1))spark.stop()}
}

DataSet

dataset作为一个强类型的数据集合,需要提供对应的类型信息

创建DataSet 对象(任意类型).toDS

package com.wunaiieqimport org.apache.spark.SparkConf
import org.apache.spark.sql.{Dataset, SparkSession}object CreateDataSet {def main(args: Array[String]): Unit = {//1.创建配置文件对象val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("CreateDataSet")//2.创建SparkSession对象val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()//4.添加隐式转换import spark.implicits._//5.使用基本类型序列对象创建Dataset对象val ds1: Dataset[Int] = Seq(3, 6, 9, 12).toDS()//6.输出ds1的结构和数据ds1.printSchema()ds1.show()//7.使用样例类序列创建Datasetval ds2: Dataset[Student] = Seq(Student(1, "tuhao", 20), Student(2, "diaosi", 21)).toDS()ds2.printSchema()ds2.show()//8.使用样例类List创建Datasetval ds3: Dataset[Student] = List(Student(1, "tuhao", 20), Student(2, "diaosi", 21)).toDS()ds3.printSchema()ds3.show()//3.关闭sparkspark.stop()}
}

在这里插入图片描述
在这里插入图片描述

DataSet 和RDD转换

1.RDD->DataSet
SparkSQL能够自动将包含有case类的RDD转换成DataSet,case类定义了table的结构,case类属性通过反射变成了表的列名。

2.DataSet->RDD
DataSet其实也是对RDD的封装,所以可以直接获取内部的RDD。

package com.wunaiieqimport org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Dataset, SparkSession}
object DataSetToRDD {def main(args: Array[String]): Unit = {//1.创建配置文件对象val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("DataSetToRDD")//2.创建SparkSession对象val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()//4.添加隐式转换import spark.implicits._//5.读取文件并使用样例类创建RDD对象val rdd: RDD[Student] = spark.sparkContext.textFile("data/sql/student.txt")//"1,tom,22"->Array("1","tom","22").map(_.split(","))//Array(1,tom,22)->Student(1,"tom",22).map(arr => Student(arr(0).toInt, arr(1), arr(2).toInt))//6.RDD->Datasetval ds: Dataset[Student] = rdd.toDS()ds.printSchema()ds.show()//7.DataSet->RDDval rdd1: RDD[Student] = ds.rdd//8.数组中的元素为Student类的对象,更加方便的获取数据val students: Array[Student] = rdd1.collect()println(students(0))println(students(0).id)println(students(0).name)println(students(0).age)//3.关闭sparkspark.stop()}
}

在这里插入图片描述

DataSet 和DataFrame转换

DF是DS的特例,因此他们之间允许互相转换

package com.wunaiieqimport org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
object DataFrameAndDataSet {def main(args: Array[String]): Unit = {//1.创建配置文件对象val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("DataFrameAndDataSet")//2.创建SparkSession对象val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()//4.添加隐式转换import spark.implicits._//5.读取文件并使用样例类创建RDD对象val rdd: RDD[Student] = spark.sparkContext.textFile("data/sql/student.txt")//"1,tom,22"->Array("1","tom","22").map(_.split(","))//Array(1,tom,22)->Student(1,"tom",22).map(arr => Student(arr(0).toInt, arr(1), arr(2).toInt))//B1.RDD->DataFrameval df: DataFrame = rdd.toDF()//B2.DataFrame->DataSetval ds: Dataset[Student] = df.as[Student]println("ds:"+ds)//B3.DataSet->DataFrameval df1: DataFrame = ds.toDF()println("df1:"+df1)//B4.RDD->DataSetval ds1: Dataset[Student] = rdd.toDS()println("ds1:"+ds1)//B5.DataSet->RDDval rdd1: RDD[Student] = ds.rdd//B6.DataFrame->RDDval rdd2: RDD[Row] = df.rdd//3.关闭sparkspark.stop()}
}

在这里插入图片描述

读写文件

parquet

Parquet 是一种二进制文件格式,用于高效地存储和处理大规模数据集
idea无法直接读取此文件,可以下载一个插件
在这里插入图片描述
如果缺少parquet的示例文件,可以参考博客
博客

package com.wunaiieq.fileimport org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
object SSRWParquet {def main(args: Array[String]): Unit = {//1.创建配置文件对象val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SSRWParquet")//2.创建SparkSession对象val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()//4.读取本地parquet文件,返回DataFrame对象//4.1方式一//format("parquet/csv/json/text/jdbc")指定读取文件的格式val df: DataFrame = spark.read.format("parquet").load("data/sql/student.parquet")//4.2方式二//由于默认的读取的文件格式为parquet,所以还可以省略format("parquet")//如果spark.sql.sources.default被修改过,不能省略format("parquet")/*val df: DataFrame = spark.read.load("data/sql/student.parquet")*///4.3方式三
//    val df: DataFrame = spark.read
//      .parquet("data/sql/student.parquet")df.printSchema()df.show()//5.写文件//   df.write.format("parquet")//    .save("data/sqlout/parquet")//由于默认的读取的文件格式为parquet,所以还可以省略format("parquet")//如果spark.sql.sources.default被修改过,不能省略format("parquet")//df.write.save("data/sqlout/parquet")//3.关闭sparkspark.stop()}
}

在这里插入图片描述
在这里插入图片描述

text

text文件实际是txt文本文件,操作方法比较多,但最后基本都是调用format这个方法,记住一个就行

package com.wunaiieq.fileimport org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
object SSRText {def main(args: Array[String]): Unit = {//创建配置文件对象val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SSRWParquet")//创建SparkSession对象val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()//format("parquet/csv/json/text/jdbc")指定读取文件的格式val df: DataFrame = spark.read.format("text").load("data/sql/student.txt")df.printSchema()df.show()//5.写文件//append 如果存在则追加//overwrite 存在则覆盖//error 存在则抛出异常//ignore 文件存在则忽略df.write.mode("append").format("text").save("data/sqlout/txt")//3.关闭sparkspark.stop()}
}

JSON

package com.itbaizhan.sqlimport org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, SparkSession}object SSRWJson {def main(args: Array[String]): Unit = {//1.创建配置文件对象val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SSRWJson")//2.创建SparkSession对象val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()//4.读取本地json文件,返回DataFrame对象val df: DataFrame = spark.read.format("json")//设置被读取文件的字符集编码.option("encoding", "utf-8").load("data/sql/student.json")//5.输出相关信息df.printSchema()df.show()//6.创建临时视图df.createTempView("student")//7.使用临时视图进行查询val dataFrame: DataFrame = spark.sql("""|select id,name,age|from student|where age between 23 and 50|""".stripMargin)dataFrame.show()
//    //8.写json文件(需要配置Hadoop)
//    df.write.mode("overwrite")
//      .format("json")
//      .save("data/sqlout/json")
//    //或者如下方式
//    df.write.mode("overwrite").json("data/sqlout/json")//3.关闭sparkspark.stop()}
}

报错提示(已整理至专题)

Exception in thread “main” org.apache.spark.sql.AnalysisException: Since Spark 2.3, the queries from raw JSON/CSV files are disallowed when the referenced columns only include the internal corrupt record column (named _corrupt_record by default). 。。。。。
JSON文件必须紧凑排列,不要使用idea格式化
下面是几种JSON文件的格式,可以参考下,建议使用第4种,但此JSON文件idea会有格式错误的提示
JSON文件格式1
在这里插入图片描述
输出效果
在这里插入图片描述
JSON文件格式2
在这里插入图片描述
在这里插入图片描述
JSON文件格式3
在这里插入图片描述
在这里插入图片描述
JSON文件格式4
在这里插入图片描述
在这里插入图片描述

CSV

package com.wunaiieqimport org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}object SSRWCsv {def main(args: Array[String]): Unit = {//1.创建配置文件对象val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SSRWCsv")//2.创建SparkSession对象val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()//4.读取本地csv文件,返回DataFrame对象val df: DataFrame = spark.read.format("csv")//第一行为列名.option("header", true)//设置字段之间的分隔符,默认是“,”.option("delimiter", ";")//未设置前各个字段都是String类型,设置后匹配对应的类型.option("inferSchema","true").option("encoding", "utf-8")//.load("data/sql/student.csv").load("data/sql/student.csv")//val df: DataFrame = spark.read.csv("data/sql/student2.csv")df.printSchema()df.show()//5写文件df.write.mode(SaveMode.Overwrite).option("header","true").csv("data/sqlout/csv")//3.关闭sparkspark.stop()}
}

在这里插入图片描述

JDBC(Mysql)

读取数据

package com.wunaiieq.fileimport org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, SparkSession}
import java.util.Properties
object SSJdbcRead {def main(args: Array[String]): Unit = {//1创建上下文环境配置对象val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("jdbcRead")//2.创建执行环境入口对象SparkSession对象val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()// 3.方式一:通过jdbc读取mysql数据库tmp_sql中的student//3.1.option("url", "jdbc连接的url字符串")//dbc:mysql://192.168.20.101:3306/tmp_sql? => jdbc:mysql://ip:port/dbname?//useSSL=false 是否使用SSL安全协议进行连接  false表示不使用,true表示使用//&useUnicode=true&characterEncoding=utf8传输数据使用字符集//编码,确保数据在传输过程中不出现乱码问题//   3.2.option("dbtable", "student")设置操作的表名称//   3.3.option("user", "root") 连接mysql数据库的用户名//   3.4.option("password", "password") 连接mysql数据库的密码//   3.5.load() 无参数  不需要指定具体的路径val df: DataFrame = spark.read.format("jdbc").option("url", "jdbc:mysql://192.168.16.100:3306/tmp_sql?useSSL=false&useUnicode=true&characterEncoding=utf8").option("driver", "com.mysql.jdbc.Driver").option("user", "root").option("password", "password").option("dbtable", "student").load()//4.方式二
//    val prop: Properties = new Properties()
//    prop.put("user", "root")
//    prop.put("password", "123456")
//    prop.put("driver", "com.mysql.jdbc.Driver")
//    val df: DataFrame = spark.read.jdbc("jdbc:mysql://node1:3306/test?useSSL=false&useUnicode=true&characterEncoding=utf8",
//      "my_score", prop)df.printSchema()df.show()spark.stop()}
}

在这里插入图片描述
在这里插入图片描述

写入数据
创建一个样例类,作为写入模板
样例类

package com.wunaiieq//定义成绩的样例类
case class Student(id:Int,name:String,age:Int)

写入类

package com.wunaiieqimport org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}import java.util.Properties
object SSJdbcWrite {def main(args: Array[String]): Unit = {//1创建上下文环境配置对象val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("jdbcWrite")//2.创建执行环境入口对象SparkSession对象val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()//3.添加隐式转换import spark.implicits._//4.使用样例类序列创建DataSetval ds: Dataset[Student] = Seq(Student(13,"A",15),Student(12,"B",12),Student(9,"C",83)).toDS()//5.将DataSet对象ds中数据写入到mysql的test实例的my_score表中//5.1方式一//A.mode("append") append表示追加写入,overwrite:表示覆盖写//B.format("jdbc") 使用jdbc协议和mysql进行通信//C.save() 无参数ds.write.mode("append").format("jdbc").option("url", "jdbc:mysql://192.168.16.100:3306/tmp_sql?useSSL=false&useUnicode=true&characterEncoding=utf8").option("user", "root").option("driver", "com.mysql.jdbc.Driver").option("password", "password").option("dbtable", "student").save()
//    //5.2方式二
//    val prop: Properties = new Properties()
//    prop.put("user", "root")
//    prop.put("password", "123456")
//    prop.put("driver", "com.mysql.jdbc.Driver")
//    ds.write.mode("append")
//      .jdbc("jdbc:mysql://node1:3306/test?useSSL=false&useUnicode=true&characterEncoding=utf8",
//        "my_score",prop)spark.stop()}
}

数据清洗

数据去重dropDuplicates

有参数:保留的是每个 id 第一次出现的记录。如果有多个记录具有相同的 id 但不同的 name,则具体保留哪个 name 取决于哪个记录首先被处理,这通常与数据的分区和排序方式有关。

package com.wunaiieqimport org.apache.spark.SparkConf
import org.apache.spark.sql.{Column, DataFrame, SparkSession}
object SSDropDuplicates {def main(args: Array[String]): Unit = {//1.创建配置文件对象val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SSDropDuplicates")//2.创建SparkSession对象val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()//4.读取本地csv文件,返回DataFrame对象val df: DataFrame = spark.read.format("csv")//第一行为列名.option("header", true)//设置字段之间的分隔符,默认是“,”.option("delimiter", ",")//未设置前各个字段都是String类型,设置后匹配对应的类型.option("inferSchema","true").option("encoding", "utf-8").load("data/sql/student.csv")df.printSchema()df.show()println("---------无参数去重---------")//5,无参数去重,将所有列联合起来进行比较,只保留一条(第一条)df.dropDuplicates().show()//6.有参数去重,指定字段进行去重,保留的是每个 id 第一次出现的记录。如果有多个记录具有相同的 id 但不同的 name,则具体保留哪个 name 取决于哪个记录首先被处理,这通常与数据的分区和排序方式有关。println("---------指定字段进行去重---------")df.dropDuplicates("id").show()//3.关闭sparkspark.stop()}
}

计算函数 functions

此处以explode为例
其他函数说明点此

package com.wunaiieqimport org.apache.spark.SparkConf
import org.apache.spark.sql.{Column, DataFrame, SparkSession}
import org.apache.spark.sql.functions._object SSFuntions {def main(args: Array[String]): Unit = {//1.创建配置文件对象val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SSFuntions")//2.创建SparkSession对象val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()import spark.implicits._//4.读取本地文件val df1: DataFrame = spark.read.text("data/sql/words.txt")//5."hello tom"=> "hello","tom"val colSplit: Column = split(df1("value"), " ")//6."hello","tom"=> "hello"//          "tom"val explodeColumn: Column = explode(colSplit)//7.对已经存在的列进行操作,返回一个新的列val df2: DataFrame = df1.withColumn("value", explodeColumn)//8.分组统计单词出现的次数,并降序排列df2.groupBy("value").count()//为列名重命名.withColumnRenamed("value","word").withColumnRenamed("count","cnt")//排序,按照单词出现的数量的倒叙排序.sort($"cnt".desc).show()//3.关闭sparkspark.stop()}
}

DSL(Domain Specific Language领域特定语言)

DSL:允许用户以类似 SQL 的方式在 Spark 中表达数据处理逻辑
以下为一个简单示例

package com.wunaiieqimport org.apache.spark.SparkConf
import org.apache.spark.sql.{Column, DataFrame, SparkSession}object SSSQlApi {def main(args: Array[String]): Unit = {//1.创建配置文件对象val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SSSQlApi")//2.创建SparkSession对象val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()//4.读取本地csv文件,返回DataFrame对象val df: DataFrame = spark.read.format("csv")//第一行为列名.option("header", true)//设置字段之间的分隔符,默认是“,”.option("delimiter", ",")//未设置前各个字段都是String类型,设置后匹配对应的类型.option("inferSchema","true").option("encoding", "utf-8").load("/input/student.csv")//5.将df注册为一个临时视图(表),只能在当前的SparkSession对象中使用df.createTempView("tb_student")//再次注册同名的视图,抛出异常TempTableAlreadyExistsException//Temporary view 'tb_score' already exists//df.createTempView("tb_score")//7.注册或替换临时视图 不存在则注册,存在则替换df.createOrReplaceTempView("tb_student")//8.注册全局的临时视图df.createOrReplaceGlobalTempView("tb_student")//6.执行查询操作spark.sql("select id,name,age from tb_student").show()//3.关闭sparkspark.stop()}
}

自定义函数

UDF(User-Defined-Function)

概述

特性
一进:
输入:UDF 接受一个或多个输入参数。这些参数通常是 DataFrame 中的列。
含义:在“一进”的情况下,UDF 接收一个输入参数,例如一个列的值。这个输入参数可以是任何类型,具体取决于 UDF 的定义和用途。
一出:
输出:UDF 返回一个输出值。这个输出值通常是基于输入参数计算得到的。
含义:在“一出”的情况下,UDF 输出一个单一的结果,这个结果可以是任何类型,如整数、浮点数、字符串、数组等,具体取决于 UDF 的实现和预期用途。

参数1:UDF名称,可被用于SparkSQL的sql语句中
参数2:被注册成UDF的方法
参数3:声明UDF的返回值类型
sparkSession.udf.register(参数1,参数2,参数3)

package com.wunaiieq
import org.apache.spark.sql.functions.udf
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.expressions.UserDefinedFunctionobject UserDefUDF {def main(args: Array[String]): Unit = {//1.创建SparkSession对象val spark: SparkSession = SparkSession.builder().master("local[*]").appName("udf1").getOrCreate()//3.案例1:无参数数的自定义函数val randomUDFObj: UserDefinedFunction = udf(() => Math.random())spark.udf.register("random",randomUDFObj)println("无参数UDF:random")spark.sql("select random()").show()//4.案例2.一个参数的自定义函数val plusOneUDFObj: UserDefinedFunction = udf((x: Int) => x + 1)spark.udf.register("plus_one",plusOneUDFObj)println("1个参数UDF:加一")spark.sql("select plus_one(5)").show()//5.案例3.两个参数的自定义函数spark.udf.register("str_len2",(str:String,num:Int)=>str.length+num)println("2个参数UDF:字符串长度+给定值")spark.sql("select str_len2('test',1)").show()//6.案例4.用在where语句中的UDFspark.udf.register("arg_filter",(n:Int)=>n>5)//设置一个给定的test表spark.range(1,10).createOrReplaceTempView("test")spark.sql("select * from test").show()//在给定表test种进行UDF查询println("where语句中的UDF:在给定表种查询")spark.sql("select * from test where arg_filter(id)").show()//7.案例5.其他SQL语句中val nameList: List[String] = List[String]("zhangsan", "lisi", "wangwu", "zhaoliu", "tianqi")//隐式转换import spark.implicits._//转换DF对象val nameDF: DataFrame = nameList.toDF("name")//将df注册为临时视图nameDF.createOrReplaceTempView("students")spark.udf.register("str_len",(name:String)=>name.length)//使用udf函数 strLenspark.sql("""select name, str_len(name) as length from students order by length desc""").show()//2.关闭spark.close()}
}

在这里插入图片描述在这里插入图片描述在这里插入图片描述在这里插入图片描述在这里插入图片描述

特殊返回值(以数组为例)
package com.wunaiieqimport org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.expressions.UserDefinedFunction
import org.apache.spark.sql.functions.udf
import org.apache.spark.sql.types.{ArrayType, StringType}object UserDefUDF2 {def main(args: Array[String]): Unit = {//1.创建SparkSession对象val spark: SparkSession = SparkSession.builder().master("local[*]").appName("udf2").getOrCreate()//2.初始化一个List集合val lineList: List[String] = List[String]("python java scala", "spark core sql streaming")import spark.implicits._//3.转换为DF对象val lineDF: DataFrame = lineList.toDF("line")//4.注册临时视图lineDF.createOrReplaceTempView("words")//5.自定义函数spark.udf.register("split_space",(line:String)=>line.split(" "),ArrayType(StringType))//spark.udf.register("split_space",(line:String)=>line.split(" "))//6.使用自定义函数spark.sql("""select line,split_space(line) as arr from words""").show()//2.关闭spark.close()}
}

在这里插入图片描述

UDAF(User-Defined Aggregation Function)

以下为计算流程图(新标签页打开)
具体的操作方法类似于hadoop中的mapreduce
在这里插入图片描述

主函数

package com.wunaiieqimport org.apache.spark.sql.{DataFrame, SparkSession, functions}object UserDefUDAFNew {def main(args: Array[String]): Unit = {//1.创建SparkSession对象val spark: SparkSession = SparkSession.builder().master("local[*]").appName("udafNew").getOrCreate()//3.读取json文件val df: DataFrame = spark.read.json("data/sql/student.json")//4.注册临时视图df.createOrReplaceTempView("tb_student")//5.注册自定义UDAF函数spark.udf.register("my_avg",functions.udaf(new MyUDAF()))//6.调用自定义的udaf函数查询每个科目,以及该科目的平均分spark.sql("""select class,my_avg(age) from tb_student group by class""").show()//2.关闭spark.close()}
}

样例类

package com.wunaiieq//自定义样例类,sum聚合,cnt数量
case class MyBuf(var sum:Int,var cnt:Int)

**MyUDAF类 **

package com.wunaiieqimport com.wunaiieq.MyBuf
import org.apache.spark.sql.{Encoder, Encoders}
import org.apache.spark.sql.expressions.Aggregatorimport scala.collection.immutable.Range//自定义类MyUDAF
/**@param Int 进来的参数类型* @param Double 输出的参数类型* @param MyBuf* */
class MyUDAF extends Aggregator[Int,MyBuf,Double]{//赋初始化的值 0,0override def zero: MyBuf = MyBuf(0,0)/**Map节点中的reduce操作将同一个分区下的数据进行分组、聚合* @param K 表示班级class,用于分组,* @param V 表示传入值,年龄* @return MyBuf对象K* */override def reduce(K: MyBuf, V: Int): MyBuf = {K.sum += VK.cnt += 1K}/**@param Final_K 表示最终汇总的对象* @param k 表示每次输入的小对象,需要将小对象的值一一聚合到K中* @return Final_K最终返回的对象,内部已经聚合了所有小对象的值* *///reduce端将同一个分组的数据进行聚合override def merge(Final_K: MyBuf, k: MyBuf): MyBuf = {Final_K.sum += k.sumFinal_K.cnt += k.cntFinal_K}/**逻辑计算: 此处的逻辑计算为 Final_K的sum值 / Final_K的cnt值* @param Final_K 最终的对象,已经聚合所有的计算结果,只差最后的逻辑计算* @return 返回逻辑计算的结果** *///聚合后每组数据得到一个MyBuf对象,然后[再做最后的计算]并返回结果override def finish(Final_K: MyBuf): Double= Final_K.sum.toDouble/Final_K.cnt//在中间计算时,每次返回的都是样例类型的对象,所以类型写入MyBuf//中间结果的序列化,元组或样例类调用Encoders.product进行序列化override def bufferEncoder: Encoder[MyBuf] = Encoders.product//最终结果的序列化,内部类型为最终的结果类型override def outputEncoder: Encoder[Double] = Encoders.scalaDouble
}

开窗函数

描述
既显示聚集前的数据又显示聚集后的数据

package com.wunaiieqimport org.apache.spark.sql.{DataFrame, Encoder, Encoders, Row, SparkSession, functions}object OpenWindowFunction {def main(args: Array[String]): Unit = {//1.创建SparkSession对象val spark: SparkSession = SparkSession.builder().master("local[*]").appName("OpenWindowFunction").getOrCreate()//3.读取json文件val df: DataFrame = spark.read.json("data/sql/student.json")//4.注册视图df.createOrReplaceTempView("tb_student")//5.聚合开窗函数//5.1.在每行信息后面显示所有人的平均年龄println("在每行信息后面显示所有人的平均年龄")spark.sql("""select id,name,A,B,avg(age) over() as avg_age from tb_student""").show()//5.2.在每行信息后面显示各个班级的平均年龄println("在每行信息后面显示各个班级的平均年龄")spark.sql("""select id,name,A,B,avg(age) over(partition by class) as avg_age from tb_student""").show()//6.排序开窗函数//6.1row_number排序开窗函数:值同名次不同,序号不间断println("------row_number-----")spark.sql("""select id,name,A,B,row_number() over(order by id desc) as rn from tb_student""").show()//6.2 dense_rank:值同名次同,序号不间断println("------dense_rank-----")spark.sql("""select id,name,A,B,dense_rank() over(order by id desc) as dr from tb_student""").show()
//    //6.3. rank: 值同名次同,序号间断println("------rank-----")spark.sql("""select id,name,A,B,rank() over(order by id desc) as rk from tb_student""").show()//2.关闭spark.close()}
}

最后一列的avg_age为计算结果,加入到表格中一起显示,这就是窗口函数的作用
在这里插入图片描述在这里插入图片描述

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com