您的位置:首页 > 游戏 > 手游 > 有哪些网站免费学习建设网站的_seo做关键词怎么收费的_长沙网址seo_网络推广100种方式

有哪些网站免费学习建设网站的_seo做关键词怎么收费的_长沙网址seo_网络推广100种方式

2024/10/13 3:55:43 来源:https://blog.csdn.net/lu070828/article/details/142494006  浏览:    关键词:有哪些网站免费学习建设网站的_seo做关键词怎么收费的_长沙网址seo_网络推广100种方式
有哪些网站免费学习建设网站的_seo做关键词怎么收费的_长沙网址seo_网络推广100种方式

一、示例代码

我们从一句常用的sql开始分析

import spark.implicits._
import spark.sqlsql("SELECT * FROM ods.personal_info limit 10").show()

其中包含两个函数调用,sql()和show(),我们依次来分析下

二、sql()

1、SparkSession

它是使用Dataset和DataFrame API对Spark编程的入口点

  //使用Spark执行SQL查询,将结果作为“DataFrame”返回。def sql(sqlText: String): DataFrame = {Dataset.ofRows(self, sessionState.sqlParser.parsePlan(sqlText))}

这里要分析来两处:

        1、sessionState.sqlParser.parsePlan(sqlText)

        2、Dataset.ofRows()

我们先看下第1处,它是通过ParserInterface调用其子类AbstractSqlParser来实现

2、AbstractSqlParser

  //为给定的SQL字符串创建逻辑计划override def parsePlan(sqlText: String): LogicalPlan = parse(sqlText) { parser =>//获取将ParseTree转换为AST的构建器(访问者) //这里用到了ANTLR的知识astBuilder.visitSingleStatement(parser.singleStatement()) match {case plan: LogicalPlan => plancase _ =>val position = Origin(None, None)throw QueryParsingErrors.sqlStatementUnsupportedError(sqlText, position)}}protected def parse[T](command: String)(toResult: SqlBaseParser => T): T = {logDebug(s"Parsing command: $command")//1、将sql转换成字符流//2、将字符流全部转换成大写,这大大简化了对流的词法分析,同时我们可以保持原始命令//3、词法分析是基于Hive的org.apache.hadoop.hive.ql.parse.ParseDriver.ANTLRNoCaseStringStream//4、Hive词法分析是基于ANTLR4val lexer = new SqlBaseLexer(new UpperCaseCharStream(CharStreams.fromString(command)))lexer.removeErrorListeners()lexer.addErrorListener(ParseErrorListener)//使用指定的令牌源和默认令牌通道 构造一个新的 CommonTokenStreamval tokenStream = new CommonTokenStream(lexer)//解析val parser = new SqlBaseParser(tokenStream)parser.addParseListener(PostProcessor)parser.removeErrorListeners()parser.addErrorListener(ParseErrorListener)//spark.sql.legacy.setopsPrecedence.enabled  默认false//当设置为true并且括号未指定求值顺序时,设置操作将在查询中从左向右执行。当设置为false且括号未指定求值顺序时,INTERSECT操作将在任何UNION、EXCEPT和MINUS操作之前执行。parser.legacy_setops_precedence_enabled = conf.setOpsPrecedenceEnforced//spark.sql.legacy.exponentLiteralAsDecimal.enabled 默认值 false//当设置为true时,具有指数的文字(例如1E-30)将被解析为Decimal而不是Double。parser.legacy_exponent_literal_as_decimal_enabled = conf.exponentLiteralAsDecimalEnabled//spark.sql.ansi.enabled   默认 false//如果为true,Spark SQL将使用符合ANSI的方言,而不是符合Hive的方言。//parser.SQL_standard_keyword_behavior = conf.ansiEnabledtry {try {// 首先,尝试使用可能更快的SLL模式进行解析parser.getInterpreter.setPredictionMode(PredictionMode.SLL)toResult(parser)}catch {case e: ParseCancellationException =>// 如果失败,则使用LL模式解析tokenStream.seek(0) // 倒带输入流parser.reset()// 重试parser.getInterpreter.setPredictionMode(PredictionMode.LL)toResult(parser)}}catch {//......}}

接下来我们看看第2步:Dataset.ofRows()

3、Dataset.ofRows()

private[sql] object Dataset {def ofRows(sparkSession: SparkSession, logicalPlan: LogicalPlan): DataFrame = {//根据逻辑计划获取一个QueryExecution //使用Spark执行关系查询的主要工作流程。旨在让开发人员轻松访问查询执行的中间阶段。val qe = sparkSession.sessionState.executePlan(logicalPlan)//断言分析qe.assertAnalyzed()//最后构建一个row类型的Dataset也就是DataFrame返回new Dataset[Row](sparkSession, qe, RowEncoder(qe.analyzed.schema))}
}

我们先整体看看这个逻辑:

        1、数据方面:根据执行计划获取QueryExecution,并对该执行计划进行分析(只是分析这一步时懒惰的,需要真正触发执行时才校验)

        2、schema:根据数据构建RowEncoder将数据进行类型转换,适配程序

接下来我们看看qe.assertAnalyzed()都做了什么

4、QueryExecution

  def assertAnalyzed(): Unit = {// Analyzer在try块外调用,以避免在下面的catch块内再次调用它//它是一个懒执行的方法,只有触发action算子时才会执行analyzedtry {//这里会对sql做分析校验sparkSession.sessionState.analyzer.checkAnalysis(analyzed)} catch {case e: AnalysisException =>//解析异常,这里会对sql进行解析并根据不同规则约束抛出不同的异常}}lazy val analyzed: LogicalPlan = {SparkSession.setActiveSession(sparkSession)sparkSession.sessionState.analyzer.execute(logical)}

接下来我们看看是如何对sql进行检查的

5、CheckAnalysis

当sql无法分析时,抛出面向用户的错误。

  def checkAnalysis(plan: LogicalPlan): Unit = {//我们对规则进行升级和排序,以捕捉第一个可能的失败,而不是级联解决失败的结果。//这里就不展开了,这里列举几个//1、跳过已分析的子计划//2、逻辑计划不应具有char/varchar类型的输出//3、Namespace 、Table、View、Hint等不存在//4、将 Table 的操作用在了 View 上 //5、表没有分区、不支持分区等等//......plan.foreachUp (......)//度量指标操作checkCollectedMetrics(plan)//覆盖以提供额外检查以进行正确分析。这些规则将在我们的内置检查规则之后进行评估。extendedCheckRules.foreach(_(plan))//如果有解析异常直接将错误抛给用户plan.foreachUp {case o if !o.resolved =>failAnalysis(s"unresolved operator ${o.simpleString(SQLConf.get.maxToStringFields)}")case _ =>}//递归地将此计划树中的所有节点标记为已分析plan.setAnalyzed()}

在构建Dataset时还需要构建一个RowEncoder,下面我们就来看看它

6、RowEncoder

它用来处理Spark SQL类型与其允许的外部类型之间的映射,比如:

BooleanType -> java.lang.Boolean
ByteType -> java.lang.Byte
ShortType -> java.lang.Short
IntegerType -> java.lang.Integer
FloatType -> java.lang.Float
DoubleType -> java.lang.Double
StringType -> String
DecimalType -> java.math.BigDecimal or scala.math.BigDecimal or Decimal

DateType -> java.sql.Date if spark.sql.datetime.java8API.enabled is false
DateType -> java.time.LocalDate if spark.sql.datetime.java8API.enabled is true

TimestampType -> java.sql.Timestamp if spark.sql.datetime.java8API.enabled is false
TimestampType -> java.time.Instant if spark.sql.datetime.java8API.enabled is true

TimestampNTZType -> java.time.LocalDateTime

DayTimeIntervalType -> java.time.Duration
YearMonthIntervalType -> java.time.Period

BinaryType -> byte array
ArrayType -> scala.collection.Seq or Array
MapType -> scala.collection.Map
StructType -> org.apache.spark.sql.Row

三、show()

sql()方法会返回一个DataFrame(其实也是一个Dataset),因此show()也是Dataset身上的。

1、Dataset

  //以表格形式显示数据集的前20行。超过20个字符的字符串将被截断,所有单元格将向右对齐。def show(): Unit = show(20)//数据样例://year  month AVG('Adj Close) MAX('Adj Close)//1980  12    0.503218        0.595103//1981  01    0.523289        0.570307//1982  02    0.436504        0.475256//1983  03    0.410516        0.442194//1984  04    0.450090        0.483521def show(numRows: Int): Unit = show(numRows, truncate = true)//truncate 含义://是否截断长字符串。如果为true,则超过20个字符的字符串将被截断,所有单元格将正确对齐def show(numRows: Int, truncate: Boolean): Unit = if (truncate) {println(showString(numRows, truncate = 20))} else {println(showString(numRows, truncate = 0))}//将查询到的数据构造成控制台可展示的字符串private[sql] def showString(_numRows: Int, truncate: Int = 20): String = {//可展示的最大行数val numRows = _numRows.max(0)//从这里就看出来,toDF()函数执行就已经将数据拿到了,因此才可以take出前21条来//我们后面重点看toDF()做了什么val takeResult = toDF().take(numRows + 1)val hasMoreData = takeResult.length > numRowsval data = takeResult.take(numRows)lazy val timeZone =DateTimeUtils.getTimeZone(sparkSession.sessionState.conf.sessionLocalTimeZone)// 对于数组值,将Seq和array替换为方括号。//对于超出“truncate”字符的单元格,将其替换为第一个“truncate-3”和“…”val rows: Seq[Seq[String]] = schema.fieldNames.toSeq +: data.map { row =>row.toSeq.map { cell =>val str = cell match {case null => "null"case binary: Array[Byte] => binary.map("%02X".format(_)).mkString("[", " ", "]")case array: Array[_] => array.mkString("[", ", ", "]")case seq: Seq[_] => seq.mkString("[", ", ", "]")case d: Date =>DateTimeUtils.dateToString(DateTimeUtils.fromJavaDate(d))case ts: Timestamp =>DateTimeUtils.timestampToString(DateTimeUtils.fromJavaTimestamp(ts), timeZone)case _ => cell.toString}if (truncate > 0 && str.length > truncate) {// 对于长度小于4个字符的字符串,不要显示省略号。if (truncate < 4) str.substring(0, truncate)else str.substring(0, truncate - 3) + "..."} else {str}}: Seq[String]}val sb = new StringBuilderval numCols = schema.fieldNames.length// 将每列的宽度初始化为最小值“3”val colWidths = Array.fill(numCols)(3)// 计算每列的宽度for (row <- rows) {for ((cell, i) <- row.zipWithIndex) {colWidths(i) = math.max(colWidths(i), cell.length)}}// 创建分隔线val sep: String = colWidths.map("-" * _).addString(sb, "+", "+", "+\n").toString()//列名填充rows.head.zipWithIndex.map { case (cell, i) =>if (truncate > 0) {StringUtils.leftPad(cell, colWidths(i))} else {StringUtils.rightPad(cell, colWidths(i))}}.addString(sb, "|", "|", "|\n")sb.append(sep)// 数据填充rows.tail.map {_.zipWithIndex.map { case (cell, i) =>if (truncate > 0) {StringUtils.leftPad(cell.toString, colWidths(i))} else {StringUtils.rightPad(cell.toString, colWidths(i))}}.addString(sb, "|", "|", "|\n")}sb.append(sep)// 对于具有多个“numRows”记录的数据if (hasMoreData) {val rowsString = if (numRows == 1) "row" else "rows"sb.append(s"only showing top $numRows $rowsString\n")}sb.toString()}

2、构建一个新的Dataset对象

sql()返回的是一个新的Dataset吗,并不是,而是自带的伴生对象Dataset

而调用了toDF()后,真的会new一个Dataset出来

def toDF(): DataFrame = new Dataset[Row](sparkSession, queryExecution, RowEncoder(schema))

那这个时候我们就很有必要看看Dataset类中有什么属性和自动调起的方法了

class Dataset[T] private[sql](@transient val sparkSession: SparkSession,@DeveloperApi @InterfaceStability.Unstable @transient val queryExecution: QueryExecution,encoder: Encoder[T])extends Serializable {//sql()中的那个断言解析是在这里调用的queryExecution.assertAnalyzed()def this(sparkSession: SparkSession, logicalPlan: LogicalPlan, encoder: Encoder[T]) = {//根据逻辑计划创建一个queryExecutionthis(sparkSession, sparkSession.sessionState.executePlan(logicalPlan), encoder)}//这个应该是为了向前兼容def this(sqlContext: SQLContext, logicalPlan: LogicalPlan, encoder: Encoder[T]) = {this(sqlContext.sparkSession, logicalPlan, encoder)}@transient private[sql] val logicalPlan: LogicalPlan = {// 对于各种命令(如DDL)和具有副作用的查询,我们强制立即执行查询,让这些副作用迅速发生。queryExecution.analyzed match {case c: Command =>//根据需要插入shuffle操作和内部行格式转换,为执行准备一个计划的[[SparkPlan]]//SparkPlan 再调用 executeCollect()  运行此查询,将结果作为数组返回。//最后还是调用RDD的collect(),运行一个Job来执行sqlLocalRelation(c.output, queryExecution.executedPlan.executeCollect())case u @ Union(children) if children.forall(_.isInstanceOf[Command]) =>LocalRelation(u.output, queryExecution.executedPlan.executeCollect())case _ =>queryExecution.analyzed}}//目前[[ExpressionEncoder]]是[[Encoder]]的唯一实现,//在这里我们显式地将传入的编码器转换为[[ExpressionEncoder]],并隐式标记它,//以便我们在构建具有相同对象类型(可能会解析为不同模式)的新Dataset对象时使用它。private[sql] implicit val exprEnc: ExpressionEncoder[T] = encoderFor(encoder)//编码器主要用作Dataset中serde表达式的容器。//我们通过这些serde表达式构建逻辑计划,并在查询框架内执行。//但是,出于性能原因,我们可能希望使用编码器作为函数,将内部行反序列化为自定义对象,//例如collect。在这里,我们解析并绑定编码器,以便稍后调用它的`fromRow`方法。private val boundEnc =exprEnc.resolveAndBind(logicalPlan.output, sparkSession.sessionState.analyzer)//sqlContext必须为val,因为导入隐式时需要一个稳定的标识符@transient lazy val sqlContext: SQLContext = sparkSession.sqlContext//将数据集的内容表示为“T”的“RDD”lazy val rdd: RDD[T] = {val objectType = exprEnc.deserializer.dataTypeval deserialized = CatalystSerde.deserialize[T](logicalPlan)sparkSession.sessionState.executePlan(deserialized).toRdd.mapPartitions { rows =>rows.map(_.get(0, objectType).asInstanceOf[T])}}}

四、总结

源码已经大概型的过了一遍,下面我们把SparkSQL执行的整个过程来捋一下

sql()

构建逻辑计划:

    1、将sql字符串转换成大写的字符流
    2、用ANTLR4对其进行词法分析
    3、构建一个CommonTokenStream并进行解析
    4、对有歧义的情况做设置,比如1E-30应该被解析为Decimal还是Double
    5、首先,尝试使用可能更快的SLL模式进行解析,如果失败,则使用LL模式解析

构建DataFrame:

    1、根据逻辑计划创建QueryExecution
    2、断言分析sql预计的异常情况(如表、视图、库是否存在等)
    3、new一个RowEncoder,为DataFrame准备schema
    4、返回由预期数据(QueryExecution)和schema组成的DataFrame

show()

    1、用户可以设置展示多少行结果,默认是20行
    2、每列结果最多显示20个字符串,用户可以设置是截断还是...代替
    3、调用toDF() new一个新的Dataset 这里面会做两件事情(1、规则优化。2、转化为RDD进行任务提交)如果任务执行成功,最终会获取到结果数据
    4、3获取的是全量数据,需要根据用户设置的显示行数做截取
    5、设置每列的宽度(最小值为3个字符)、分割线、表头和数据
    6、控制台展示结果

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com