您的位置:首页 > 娱乐 > 八卦 > 网页广告投放_h5页面生成工具_比较靠谱的推广平台_某网站seo诊断分析

网页广告投放_h5页面生成工具_比较靠谱的推广平台_某网站seo诊断分析

2025/1/11 6:06:54 来源:https://blog.csdn.net/zhixingheyi_tian/article/details/143736600  浏览:    关键词:网页广告投放_h5页面生成工具_比较靠谱的推广平台_某网站seo诊断分析
网页广告投放_h5页面生成工具_比较靠谱的推广平台_某网站seo诊断分析
数据本地性
object TaskLocality extends Enumeration {// Process local is expected to be used ONLY within TaskSetManager for now.val PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY = Valuetype TaskLocality = Valuedef isAllowed(constraint: TaskLocality, condition: TaskLocality): Boolean = {condition <= constraint}
}
  • PROCESS_LOCAL:

要处理的数据在同一个本地进程,
即数据和Task在同一个Excutor JVM中。
这种情况是RDD的数据经过缓存,此时不需要网络传输,是最优locality。(但是数据要先缓存)。

  • NODE_LOCAL:

(1)数据和Task在同一节点上的不同executor中;
(2)数据HDFS和Task在同一个结点上,
此时需要进行进程间进行传输,速度比PROCESS_LOCAL略慢。

  • NO_PREF:

数据从哪访问都一样,相当于没有数据本地性,一般值从外部数据源读取数据。

  • RACK_LOCAL:

数据与Task在同机架的不同节点,此时需要通过网络传输,速度比NODE_LOCAL慢。

  • ANY:

数据和Task可能在集群的任何地方,性能最差,一般出现这种情况就该排查原因了

TaskSetManager

TaskSetManager.scala

  /** Add a task to all the pending-task lists that it should be on. */private[spark] def addPendingTask(index: Int,resolveRacks: Boolean = true,speculatable: Boolean = false): Unit = {// A zombie TaskSetManager may reach here while handling failed task.if (isZombie) returnval pendingTaskSetToAddTo = if (speculatable) pendingSpeculatableTasks else pendingTasksfor (loc <- tasks(index).preferredLocations) {loc match {case e: ExecutorCacheTaskLocation =>pendingTaskSetToAddTo.forExecutor.getOrElseUpdate(e.executorId, new ArrayBuffer) += indexcase e: HDFSCacheTaskLocation =>val exe = sched.getExecutorsAliveOnHost(loc.host)exe match {case Some(set) =>for (e <- set) {pendingTaskSetToAddTo.forExecutor.getOrElseUpdate(e, new ArrayBuffer) += index}logInfo(s"Pending task $index has a cached location at ${e.host} " +", where there are executors " + set.mkString(","))case None => logDebug(s"Pending task $index has a cached location at ${e.host} " +", but there are no executors alive there.")}case _ =>}pendingTaskSetToAddTo.forHost.getOrElseUpdate(loc.host, new ArrayBuffer) += indexif (resolveRacks) {sched.getRackForHost(loc.host).foreach { rack =>pendingTaskSetToAddTo.forRack.getOrElseUpdate(rack, new ArrayBuffer) += index}}}if (tasks(index).preferredLocations == Nil) {pendingTaskSetToAddTo.noPrefs += index}pendingTaskSetToAddTo.all += index}
TaskLocation

TaskLocation.scala

/*** A location that includes both a host and an executor id on that host.*/
private [spark]
case class ExecutorCacheTaskLocation(override val host: String, executorId: String)extends TaskLocation {override def toString: String = s"${TaskLocation.executorLocationTag}${host}_$executorId"
}/*** A location on a host.*/
private [spark] case class HostTaskLocation(override val host: String) extends TaskLocation {override def toString: String = host
}/*** A location on a host that is cached by HDFS.*/
private [spark] case class HDFSCacheTaskLocation(override val host: String) extends TaskLocation {override def toString: String = TaskLocation.inMemoryLocationTag + host
}
private[spark] object TaskLocation {// We identify hosts on which the block is cached with this prefix.  Because this prefix contains// underscores, which are not legal characters in hostnames, there should be no potential for// confusion.  See  RFC 952 and RFC 1123 for information about the format of hostnames.val inMemoryLocationTag = "hdfs_cache_"// Identify locations of executors with this prefix.val executorLocationTag = "executor_"

按照PROCESS_LOCAL、NODE_LOCAL、NO_PREF、RACK_LOCAL、ANY的顺序进行调度task。

getCacheLocs

DAGScheduler.scala

  private[scheduler]def getCacheLocs(rdd: RDD[_]): IndexedSeq[Seq[TaskLocation]] = rdd.stateLock.synchronized {cacheLocs.synchronized {// Note: this doesn't use `getOrElse()` because this method is called O(num tasks) timesif (!cacheLocs.contains(rdd.id)) {// Note: if the storage level is NONE, we don't need to get locations from block manager.val locs: IndexedSeq[Seq[TaskLocation]] = if (rdd.getStorageLevel == StorageLevel.NONE) {IndexedSeq.fill(rdd.partitions.length)(Nil)} else {val blockIds =rdd.partitions.indices.map(index => RDDBlockId(rdd.id, index)).toArray[BlockId]blockManagerMaster.getLocations(blockIds).map { bms =>bms.map(bm => TaskLocation(bm.host, bm.executorId))}}cacheLocs(rdd.id) = locs}cacheLocs(rdd.id)}}

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com