您的位置:首页 > 健康 > 养生 > 深圳网站优化服务_广东网络优化推广_优化水平_seo引擎优化工具

深圳网站优化服务_广东网络优化推广_优化水平_seo引擎优化工具

2024/10/6 16:26:04 来源:https://blog.csdn.net/tus00000/article/details/142313966  浏览:    关键词:深圳网站优化服务_广东网络优化推广_优化水平_seo引擎优化工具
深圳网站优化服务_广东网络优化推广_优化水平_seo引擎优化工具

扫描对象

运行时会使用runtime.gcDrain函数扫描工作缓冲区中的灰色对象,它会根据传入gcDrainFlags的不同选择不同的策略:

// 清空垃圾收集器的工作缓冲区
func gcDrain(gcw *gcWork, flags gcDrainFlags) {// 获取当前M上运行的Goroutinegp := getg().m.curg// 清空操作是否可被抢占preemptible := flags&gcDrainUntilPreempt != 0// 是否刷新后台信用flushBgCredit := flags&gcDrainFlushBgCredit != 0// 垃圾收集器是否在空闲时间运行idle := flags&gcDrainIdle != 0// 记录已扫描完成的工作量initScanWork := gcw.scanWork// int最大值,用于禁用检查checkWork := int64(1<<63 - 1)// 用于决定何时退出清空循环的函数var check func() bool// 如果是空闲或分时模式if flags&(gcDrainIdle|gcDrainFractional) != 0 {// 再完成drainCheckThreshold工作量后触发检查checkWork = initScanWork + drainCheckThreshold// 如果是空闲模式if idle {// 使用pollWork函数进行检查check = pollWork// 如果是分时模式} else if flags&gcDrainFractional != 0 {// 使用pollFractionalWorkerExit函数进行检查check = pollFractionalWorkerExit}}...
}

1.gcDrainUntilPreempt——当Goroutine的preempt字段被设置成true时返回;

2.gcDrainIdle——调用runtime.pollWork函数,当处理器上包含其他待执行Goroutine时返回;

3.gcDrainFractional——调用runtime.pollFractionalWorkerExit函数,当CPU占用率超过fractionalUtilizationGoal的20%时返回;

4.gcDrainFlushBgCredit——调用runtime.gcFlushBgCredit计算后台完成的标记任务量以减少并发标记期间的辅助垃圾收集的用户程序的工作量;

运行时会使用本地变量中的check函数检查当前是否应该退出标记任务并让出处理器。当我们做完准备工作后,就可以开始扫描全局变量中的根对象了,这也是标记阶段中需要最先被执行的任务:

func gcDrain(gcw *gcWork, flags gcDrainFlags) {...// 如果下一个需要处理的根对象索引小于需要处理的根对象总数if work.markrootNext < work.markrootJobs {// 循环处理,当垃圾收集过程不可被抢占 || 未被抢占for !(preemptible && gp.preempt) {// 递增需处理的根对象索引,并获取递增前的值job := atomic.Xadd(&work.markrootNext, +1) - 1// 如果已处理完所有根对象if job >= work.markrootJobs {break}// 标记指定的根对象markroot(gcw, job)// 如果检查函数决定停止垃圾收集if check != nil && check() {goto done}}}...
}

扫描根对象需要使用runtime.markroot函数,该函数会扫描缓存、数据段、存放全局变量和静态变量的BSS段、Goroutine的栈内存;一旦完成了对根对象的扫描,当前Goroutine会开始从本地和全局的工作缓存池中获取待执行的任务:

func gcDrain(gcw *gcWork, flags gcDrainFlags) {...// 循环处理,当垃圾收集过程不可被抢占 || 未被抢占for !(preemptible && gp.preempt) {// 如果全局工作队列为空if work.full == 0 {// 平衡全局工作队列和本地工作队列中的任务gcw.balance()}// 尝试快速从本地缓冲区获取对象bb := gcw.tryGetFast()// 如果获取失败if b == 0 {// 尝试从本地或全局队列获取对象,通常会加锁,速度较慢b = gcw.tryGet()// 如果还是获取失败if b == 0 {// 刷新写屏障缓冲区wbBufFlush(nil, 0)// 再次从本地或全局队列获取对象b = gcw.tryGet()}}// 如果还是没有获取到if b == 0 {break}// 对获取的对象b进行扫描scanobject(b, gcw)// 如果已完成的扫描量超过阈值if gcw.scanWork >= gcCreditSlack {// 将本地累计的工作量更新到全局计数器atomic.Xaddint64(&gcController.scanWork, gcw.scanWork)// 如果需要flush后台信用if flushBgCredit {// 进行flush操作gcFlushBgCredit(gcw.scanWork - initScanWork)initScanWork = 0}// 减去已扫描完成的工作量checkWork -= gcw.scanWork// 重置已完成工作量gcw.scanWork = 0// 已经完成预定工作量if checkWork <= 0 {// 将checkWork重置为下一个周期的阈值checkWork += drainCheckThreshold// 检查是否需要停止垃圾收集if check != nil && check() {break}}}}...
}

扫描对象会使用runtime.scanobject,该函数会从传入的位置开始扫描,扫描期间会调用runtime.greyobject为找到的活跃对象上色。

func gcDrain(gcw *gcWork, flags gcDrainFlags) {...
done:if gcw.scanWork > 0 {atomic.Xaddint64(&gcController.scanWork, gcw.scanWork)if flushBgCredit {gcFlushBgCredit(gcw.scanWork - initScanWork)}gcw.scanWork = 0}
}

当本轮扫描因为外部条件变化而中断时,该函数会通过runtime.gcFlushBgCredit记录这次扫描的内存字节数用于减少辅助标记的工作量。

内存中对象的扫描和标记过程涉及很多位操作和指针操作,相关代码实现比较复杂,这里就不展开介绍了,感兴趣的读者可以将runtime.gcDrain作为入口研究三色标记的具体过程。

写屏障

写屏障是保证Go语言并发标记安全的技术,我们需要使用混合写屏障维护对象图的弱三色不变性,然而写屏障的实现需要编译器和运行时的共同协作。在SSA中间代码生成阶段,编译器会使用cmd/compile/internal/ssa.writebarrier函数在StoreMoveZero操作中加入写屏障,生成如下所示的代码:

if writeBarrier.enabled {gcWriteBarrier(ptr, val)
} else {*ptr = val
}

当Go语言进入垃圾收集阶段时,全局变量runtime.writeBarrier中的enabled字段会被置成开启,所有的写操作都会调用runtime.gcWriteBarrier

TEXT runtime.gcWriteBarrier(SB),NOSPLIT,$28...// 获取线程本地存储TLS的基址到寄存器BXget_tls(BX)// 从TLS中获取当前Goroutine的指针到BXMOVL  g(BX), BX// 从Goroutine结构中获取M的指针到BXMOVL  g_m(BX), BX// 从M结构中获取P的指针到BXMOVL  m_p(BX), BX// 将P的写屏障缓冲区的下一个可写地址加载到寄存器CXMOVL  (p_wbBuf+wbBuf_next)(BX), CX// 获取下下个可写地址MOVL  8(CX), CX// 更新下一个可写地址的下一个可写地址MOVL  CX, (p_wbBuf+wbBuf_next)(BX)// 比较下下个可写地址和下一个可写地址的结尾CMPL  CX, (p_wbBuf+wbBuf_end)(BX)// 将寄存器AX中存储的要写入的值放到下一个缓冲区中MOVL  AX, -8(CX) // 记录值// 将DI寄存器指向的写操作目标地址的值(即被覆盖地址)存放到寄存器BX中MOVL  (DI), BX// 将目标槽位的当前值存放到缓冲区中MOVL  BX, -4(CX) // 记录*slot// 基于上面CMPL的比较操作的结果决定是否跳转到flush,如果buffer full,则跳转JEQ   flush
// 函数返回
ret:// 从栈中恢复寄存器的值MOVL  20(SP), CXMOVL  24(SP), BX// 将AX写入DI指向的地址,即实际的写操作MOVL  AX, (DI) // 触发写操作RETflush:...// 调用runtime.wbBufFlush刷新缓冲区,将缓冲区中的记录提交CALL  runtime.wbBufFlush(SB)...JMP   ret

在上述汇编函数中,DI寄存器是写操作数的目的地址,AX寄存器中存储了被覆盖的值,该函数会覆盖原来的值并通过runtime.wbBufFlush通知垃圾收集器将原值和新值加入当前处理器的工作队列,因为该写屏障的实现比较复杂,所以写屏障对程序性能影响较大,没有写屏障时只需一条指令完成的工作,现在需要几十条指令。

我们在上面提到过Dijkstra和Yuasa写屏障组成的混合写屏障在开启后,所有新创建的对象都需要被直接涂成黑色,该标记过程由runtime.gcmarknewobject完成的:

func mallocgc(size uintptr, typ *_type, needzero bool) unsafe.Pointer {...// 如果垃圾收集正在进行if gcphase != _GCoff {// 标记新分配的对象,防止其被误回收gcmarknewobject(uintptr(x), size, scanSize)}...
}func gcmarknewobject(obj, size, scanSize uintptr) {// markBitsForAddr函数获取对象地址对应的标记位,然后调用setMarked进行标记markBitsForAddr(obj).setMarked()// 获取当前P的垃圾收集工作队列gcw := &getg().m.p.ptr().gcw// 累计已标记的字节数gcw.bytesMarked += uintptr64(size)// 累计扫描的工作量gcw.scanWork += int64(scanSize)
}

runtime.mallocgc会在垃圾收集开始后调用gcmarknewobject函数,通过markBitsForAddr函数获取对象对应的标记位runtime.markBits,并调用runtime.markBits.setMarked直接将新的对象涂成黑色。

标记辅助

为了保证用户程序分配内存的速度不会超出后台任务的标记速度,运行时还引入了标记辅助技术,它遵循一条简单且朴实的原则,分配多少内存就需要完成多少标记任务。每一个Goroutine都持有gcAssistBytes字段,这个字段存储了当前Goroutine辅助标记的对象字节数。在并发标记阶段期间,当Goroutine调用runtime.mallocgc分配新对象时,该函数会检查申请内存的Goroutine是否处于入不敷出的状态:

func malloc(size uintptr, typ *_type, needzero bool) unsafe.Pointer {...var assistG *g// 如果垃圾收集器在运行阶段if gcBlackenEnabled != 0 {// 获取当前执行的GoroutineassistG = getg()// 如果当前Goroutine有关联的用户Goroutineif assistG.m.curg != nil {// 使用用户Goroutine进行辅助assistG = assistG.m.curg}// 减少计数器的值,表示对等量的垃圾回收工作进行辅助assistG.gcAssistBytes -= int64(size)// 如果计数器小于0,则触发标记辅助if assistG.gcAssistBytes < 0 {// 调用gcAssistAlloc进行标记辅助gcAssistAlloc(assistG)}}...return x
}

申请内存时调用的runtime.gcAssistAlloc和扫描内存时调用的runtime.gcFlushBgCredit分别负责“借债”和“还债”,通过这套债务管理系统,我们能保证Goroutine在正常运行的同时不会为垃圾收集造成太多压力,保证在达到堆大小目标时完成标记阶段。
在这里插入图片描述
每个Goroutine持有的gcAssistBytes表示当前协程辅助标记的字节数,全局垃圾收集控制器持有的bgScanCredit表示后台协程辅助标记的字节数,当本地Goroutine分配了较多的对象时,可以使用公用的信用bgScanCredit偿还(换句话说,如果本地Goroutine分配的对象较多,可以用后台Goroutine做的垃圾收集工作来抵消一部分需要自己完成的工作,这样本地Goroutine就可以少做些工作)。我们先分析runtime.gcAssistAlloc函数的实现:

// 辅助标记
func gcAssistAlloc(gp *g) {...
// 重新计算需要偿还的“债务”
retry:// 该Goroutine当前的辅助标记债务,即需要偿还的辅助标记字节数debtBytes := -gp.gcAssistBytes// 需要执行的垃圾收集工作量,字节数乘每个字节需要完成的垃圾收集工作量scanWork := int64(gcController.assistWorkPerByte * float64(debtBytes))// 如果垃圾收集工作量过小if scanWork < gcOverAssistWork {// 使用最小垃圾收集辅助标记工作量scanWork = gcOverAssistWork// 重新计算债务字节数debtBytes = int64(gcController.assistBytesPerWork * float64(scanWork))}// 获取全局的后台扫描信用bgScanCredit := atomic.Loadint64(&gcController.bgScanCredit)// 当前Goroutine可以从全局后台扫描信用中借用的工作量stolen := int64(0)// 如果有可用的后台信用if bgScanCredit > 0 {// 如果后台信用小于需要执行的工作量if bgScanCredit < scanWork {// 借用全部的后台信用stolen = bgScanCredit// 增加对应的辅助信用,加1应该是为了防止舍入误差gp.gcAssistBytes += 1 + int64(gcController.assistBytesPerWork*float64(stolen))} else {// 只借用所需的工作量的信用stolen = scanwork// 完全抵消债务,在scanWork < gcOverAssistWork时,gcAssistBytes还会变为正数gp.gcAssistBytes += debtBytes}// 原子地扣除全局信用atomic.Xaddint64(&gcController.bgScanCredit, -stolen)// 更新剩余工作量scanWork -= stolen// 如果债务已偿还,就不用进行辅助标记了if scanWork == 0 {return}}...
}

该函数会先根据Goroutine的gcAssistBytes和垃圾收集器的配置计算需要完成的标记任务数量,如果全局信用bgScanCredit中有可用的点数,那么就会减去该点数,因为并发执行没有加锁,所以全局信用可能会被更新成负值,然而在长期来看这不是一个重要的问题。

如果全局信用不足以覆盖本地债务,运行时会在系统栈中调用runtime.gcAssistAlloc1执行标记任务,该函数会直接调用runtime.gcDrainN完成指定数量的标记任务并返回:

func gcAssistAlloc(gp *g) {...// 系统栈上执行辅助标记任务systemstack(func() {gcAssistAlloc1(gp, scanWork)})...// 如果当前Goroutine还有债务if gp.gcAssistBytes < 0 {// 如果当前Goroutine被抢占if gp.preempt {// 让出处理器Gosched()// 重新计算需要偿还的债务goto retry}// gcParkAssist函数挂起当前Goroutine,直到垃圾收集标记阶段完成// 如果挂起失败if !gcParkAssist() {goto retry}}
}

如果在完成标记辅助任务后,当前Goroutine仍然入不敷出(指全局信用不能覆盖本地债务,且本地债务在执行完gcAssistAlloc1后还存在),并且Goroutine没有被抢占,那么运行时会执行runtime.gcParkAssist;在该函数中,如果全局信用依然不足,runtime.gcParkAssist会将当前Goroutine陷入休眠、加入全局的辅助标记队列,然后等待后台标记任务的唤醒。

用于还债的runtime.gcFlushBgCredit实现比较简单,如果辅助队列中不存在等待的Goroutine,那么当前信用会直接加到全局信用bgScanCredit中:

// 将后台垃圾收集器完成的扫描工作量转换为信用,分配给等待中的Goroutine,减少它们的辅助债务
func gcFlushBgCredit(scanWork int64) {// 如果没有Goroutine阻塞在辅助队列中if work.assistQueue.q.empty() {// 使用原子操作将全部scanWork加入全局信用atomic.Xaddint64(&gcController.bgScanCredit, scanWork)return}// 计算后台完成的工作量对应的辅助债务字节数scanBytes := int64(float64(scanWork) * gcController.assistBytesPerWork)// 循环处理,当辅助队列非空 && 还有信用可分配时for !work.assistQueue.q.empty() && scanBytes > 0 {// 从辅助队列中取出一个Goroutinegp := work.assistQueue.q.pop()// 如果可用信用可以完全覆盖债务,gp.gcAssistBytes是债务,它是一个负数if scanBytes+gp.gcAssistBytes >= 0 {// 分配信用给该GoroutinescanBytes += gp.gcAssistBytes// 清零债务gp.gcAssistBytes = 0// 唤醒Goroutine,将其放入运行队列ready(gp, 0, false)// 偿还部分债务} else {// 将可分配信用全部给该Goroutinegp.gcAssistBytes += scanBytes// 清零信用scanBytes = 0// 将Goroutine放回辅助队列,等待更多信用work.assistQueue.q.pushBack(gp)break}}// 如果还有剩余的信用if scanBytes > 0 {// 将其转换为工作量,然后原子地添加到全局信用中scanWork = int64(float64(scanBytes) * gcController.assistWorkPerByte)atomic.Xaddint64(&gcController.bgScanCredit, scanWork)}
}

如果辅助队列不为空,上述函数会根据每个Goroutine的债务数量和已完成的工作决定是否唤醒这些陷入休眠的Goroutine;如果唤醒所有Goroutine后,标记任务量仍有剩余,将这些标记任务量加到全局信用中。
在这里插入图片描述
用户程序辅助标记的核心目的是避免用户程序分配内存影响垃圾收集器完成标记工作的期望时间,它通过维护账户体系保证用户程序不会对垃圾收集造成过多负担,一旦用户程序分配了大量内存,该用户程序就会通过辅助标记的方式平衡账本,这个过程会在最后达到相对平衡,保证标记任务在到达期望堆大小时完成。

标记终止

当所有处理器的本地任务都完成且不存在剩余工作Goroutine时,后台并发任务或辅助标记的用户程序会调用runtime.gcMarkDone通知垃圾收集器。当所有可达对象都被标记后,该函数会将垃圾收集的状态切换至_GCmarktermination;如果本地队列中仍然存在待处理的任务,当前方法会将所有任务加入全局队列并等待其他Goroutine完成处理:

func gcMarkDone() {...
top:// 如果当前不处于标记阶段 || 所有标记线程都已完成工作进入等待 || 没有未完成的标记工作// 条件为true时,标记工作可能已完成if !(gcphase == _GCmark && work.nwait == work.nproc && !gcMarkWorkAvailable(nil)) {return}// 刷新写屏障后是否产生了新的标记工作gcMarkDoneFlushed = 0// 切换到系统栈刷新写屏障缓冲区systemstack(func() {// 获取当前正在运行的Goroutinegp := getg().m.curg// 将其停止运行casgstatus(gp, _Grunning, _Gwaiting)// 遍历所有PforEachP(func(_p_ *p) {// 刷新当前P的写屏障缓冲区wbBufFlush1(_p_)// 如果刷新写屏障缓冲区后产生了新的标记工作if _p_.gcw.flushedWork {// 原子地递增gcMarkDoneFlushedatomic.Xadd(&gcMarkDoneFlushed, 1)// 重置标记位_p_.gcw.flushedWork = false}})// 恢复Goroutine的运行casgstatus(gp, _Gwaiting, _Grunning)})// 如果有新的标记工作if gcMarkDoneFlushed != 0 {// 重新检查是否满足标记阶段结束的条件goto top}...
}

如果运行时中不包含全局任务、处理器中也不存在本地任务,那么当前垃圾收集循环中的灰色对象也就都标记成了黑色,我们就可以开始触发垃圾收集的阶段迁移了:

func gcMarkDone() {...// 禁止当前M被抢占,防止在关键的垃圾收集阶段被抢占getg().m.preemptoff = "gcing"// 在系统栈上执行stopTheWorldWithSema,停止所有Goroutine,只允许垃圾收集器和调度器运行systemstack(stopTheWorldWithSema)...// 原子地将gcBlackenEnabled改为0,之后新分配的对象不再立即对其进行标记atomic.Store(&gcBlackenEnabled, 0)// 唤醒所有辅助标记Goroutine,确保进入下一阶段时,没有Goroutine因辅助标记而被阻塞gcWakeAllAssists()// 重新激活调度器以允许用户Goroutine继续执行schedEnableUser(true)// 结束当前垃圾收集周期,并计算下次垃圾收集触发条件nextTriggerRatio := gcController.endCycle()// 标记终止阶段gcMarkTermination(nextTriggerRatio)
}

上述函数在最后会关闭混合写屏障、唤醒所有协助垃圾收集的用户Goroutine、恢复用户Goroutine的调度并调用runtime.gcMarkTermination进入标记终止阶段:

func gcMarkTermination(nextTriggerRatio float64) {atomic.Store(&gcBlackenEnabled, 0)// 进入标记终止阶段setGCPhase(_GCmarktermination)// 获取当前执行的系统Goroutine_g_ := getg()// 获取系统Goroutine执行的M上的用户Goroutinegp := _g_.m.curg// 将用户Goroutine状态从_Grunning改为_Gwaitingcasgstatus(gp, _Grunning, _Gwaiting)// 系统栈上执行,完成剩余的标记工作,确保所有需要标记的对象都已被处理器systemstack(func() {gcMark(startTime)})// 系统栈上执行systemstack(func() {// 进入GC关闭阶段setGCPhase(_GCoff)// 执行清除工作,回收未被标记的对象,释放内存gcSweep(work.mode)})// 恢复用户Goroutine的运行casgstatus(gp, _Gwaiting, _Grunning)// 设置下一次GC周期的触发条件gcSetTriggerRatio(nextTriggerRatio)// 唤醒内存回收器,释放OS分配但未被使用的内存wakeScavenger()...// 唤醒等待进行清除工作的Goroutineinjectglist(&work.sweepWaiters.list)// 停止STWsystemstack(func() { startTheWorldWithSema(true) })// 为清除工作准备缓冲区prepareFreeWorkbufs()// 回收尚未使用的stack spans(即未使用的栈空间)systemstack(freeStackSpans)systemstack(func() {// 遍历所有PforEachP(func(_p_ *p) {// 为P进行缓存清除工作的准备_p_.mcache.prepareForSweep()})})...
}

我们省略了函数中很多数据统计的代码,包括正在使用的内存大小、本轮垃圾收集的暂停时间、CPU利用率等数据,这些数据能帮助控制器决定下一轮触发垃圾收集的堆大小,除了数据统计外,该函数还会调用runtime.gcSweep重置清理阶段的相关状态,并在需要时阻塞清理所有的内存管理单元;_GCmarktermination状态在垃圾收集中不会持续太久,它会迅速转换至_GCoff并恢复应用程序,到这里垃圾收集的全过程基本就结束了,用户程序在申请内存时才会惰性回收内存。

内存清理

垃圾收集的清理中包含对象回收器(Reclaimer)和内存单元回收器,这两种回收器使用不同算法清理堆内存:
1.对象回收器在内存管理单元中查找并释放未被标记的对象,但如果runtime.mspan中的所有对象都没有被标记,整个单元就会被直接回收,该过程会被runtime.mcentral.cacheSpanruntime.sweepone异步触发;

2.内存单元回收器会在内存中查找所有对象都未被标记的runtime.mspan,该过程会被runtime.mheap.reclaim触发;

runtime.sweepone是我们在垃圾收集过程中经常会见到的函数,该函数会在堆内存中查找待清理的内存管理单元:

// 执行一次清扫操作,尝试回收一个span,并返回回收的页面数
func sweepone() uintptr {...var s *mspan// 获取堆当前的清扫代数sg := mheap_.sweepgenfor {// 通过清扫代数确定清扫列表索引s = mheap_.sweepSpans[1-sg/2%2].pop()// 如果没有span需要清扫if s == nil {break}// 如果当前span状态是mSpanInUseif state := s.state.get(); state != mSpanInUse {continue}// 确保该span的清扫代数为sg-2,且将其改为sg-1成功,sg-1表示正在被清扫if s.sweepgen == sg-2 && atomic.Cas(&s.sweepgen, sg-2, sg-1) {// 跳出循环,准备清扫sbreak}}// 初始化回收的页面数,全1的值表示未回收npages := ^uintptr(0)if s != nil {// 该span包含的页面数npages = s.npages// 执行实际的清扫工作,如果成功if s.sweep(false) {// 将回收的页面数累加到mheap_.reclaimCreditatomic.Xadduintptr(&mheap_.reclaimCredit, npages)} else {// 清扫失败,将清扫的页面数设为0npages = 0}}// 减少当前G对应的M的锁计数,释放锁资源_g_.m.locks--return npages
}

查找内存管理单元时会通过statesweepgen两个字段判断当前单元是否需要处理。如果内存单元的sweepgen等于mheap.sweepgen-2,那么就意味着当前单元需要被清理,如果等于mheap.sweepgen - 1,那么当前管理单元就正在被清理。

所有回收工作最终都是靠runtime.mspan.sweep完成的,该函数会根据并发标记阶段的结果回收内存单元中的垃圾并清除标记以免影响下一轮垃圾收集。

7.2.4 小结

Go语言垃圾收集器的实现非常复杂,作者认为这是编程语言中最复杂的一个模块,调度器的复杂度与垃圾收集器完全不是一个级别,我们在分析垃圾收集器的过程中不得不省略很多实现细节,其中包括并发标记对象的过程、清扫垃圾的具体实现,这些过程涉及大量底层的位操作和指针操作,感兴趣的读者可以自行探索(github上可以找到源码)。

垃圾收集是一门非常古老的技术,它的执行速度和利用率很大程度上决定了程序的运行速度,Go语言为了实现高性能的并发垃圾收集器,使用三色抽象、并发增量回收、混合写屏障、调步算法、用户程序协助等机制将垃圾收集的暂停时间优化至毫秒级以下,从早期的版本看到今天,我们能体会到其中的工程设计和演进,作者觉得分析垃圾收集的实现还是非常有趣和值得的。

7.3 栈空间管理

应用程序的内存一般会分成堆区和栈区两个部分,程序在运行期间可以主动从堆区申请内存空间,这些内存由内存分配器分配并由垃圾收集器负责回收,我们在上两节已经详细分析了堆内存的申请和释放过程,本节会介绍Go语言栈内存的实现原理。

7.3.1 设计原理

栈区的内存一般由编译器自动分配和释放,其中存储着函数的入参以及局部变量,这些参数会随着函数的创建而创建,函数的返回而消亡,这种线性的内存分配策略有着极高的效率,但工程师也往往不能控制栈内存的分配,这部分工作基本是由编译器自动完成的。

寄存器

寄存器是中央处理器(CPU)中的稀缺资源,它的存储能力非常有限,但是能提供最快的读写速度,充分利用寄存器的效率可以构建高性能的应用程序。寄存器在物理机上非常有限,然而栈区的操作就会使用到两个以上寄存器,这足以说明栈内存在应用程序中的重要性。

栈寄存器是CPU寄存器中的一种,它的主要作用是跟踪函数的调用栈,Go语言的汇编代码中包含BP和SP两个栈寄存器,它们分别存储了栈的基址和栈顶的地址,栈内存与函数调用的关系非常紧密,我们在函数调用一节中曾介绍过栈区,BP和SP之间的内存就是当前函数的调用栈。
在这里插入图片描述
由于历史的设计问题,目前的栈区都是从高地址向低地址扩展的,当应用程序申请或释放栈内存时只需修改SP寄存器的值,这种线性的内存分配方式与堆内存相比更加快速,占用极少的额外开销。

线程栈

如果我们在Linux中执行pthread_create系统调用,进程会启动一个新的线程,如果用户没有通过软资源限制RLIMIT_STACK指定线程栈的大小,那么操作系统会根据架构选择不同的默认栈大小。
在这里插入图片描述
多数架构上默认栈大小都在2~4MB左右,极少数架构会使用32MB作为默认大小,用户程序可以在分配的栈上存储函数参数和局部变量。然而这个固定的栈大小在某些场景下可能不是一个合适的值,如果一个程序需要同时运行几百个甚至上千个线程,那么这些线程中的绝大部分都只会用到很少的栈空间,而如果函数的调用栈非常深,固定的栈大小也无法满足用户程序的需求。

线程和进程都是代码执行的上下文(Context of Execution),但是如果一个应用程序中包含成百上千个执行上下文且每个上下文都是线程,就会占用大量的内存空间并带来其他的额外开销,Go语言在设计时认为执行上下文应该是轻量级的,所以它实现了用户级的Goroutine作为执行上下文。

逃逸分析

在C和C++这类需要手动管理内存的编程语言中,将对象或结构体分配到栈上或堆上是由工程师自主决定的,这也为工程师的工作带来了挑战,如果工程师能够精准地为每一个变量分配最合理的空间,那么整个程序的运行效率和内存使用效率一定是最高的,但手动分配内存会导致如下两个问题:
1.不需要分配到堆上的对象分配到了堆上——浪费内存空间;

2.需要分配到堆上的对象分配到了栈上——悬挂指针、影响内存安全;

与悬挂指针相比,浪费的内存空间反而是小问题。在C中,栈上的变量被函数作为返回值返回给调用方是一个常见的错误,如下代码中,栈上变量i被错误地返回:

int *dangling_pointer() {int i = 2;return &i;
}

dangling_pointer函数返回后,它的本地变量就会被编译器回收,调用方获取的是危险的悬挂指针,我们不确定当前指针指向的值是否合法,这种问题在大型项目中是比较难以定位和发现的。

在编译器优化中,逃逸分析(Escape analysis)是用来决定指针动态作用域的方法。Go的编译器使用逃逸分析决定哪些变量应该在栈上分配,哪些变量应该在堆上分配,其中包括使用newmake、字面量等方法隐式分配的内存,Go的逃逸分析遵循以下两个不变性:
1.指向栈对象的指针不能存在于堆中;

2.指向栈对象的指针不能在栈对象回收后存活;
在这里插入图片描述
我们通过上图展示两条不变性存在的意义,当我们违反了第一条不变性,堆上的绿色指针指向了栈中的黄色内存,一旦当前函数返回,函数栈被回收,该绿色指针指向的值就不再合法;如果我们违反了第二条不变性,因为寄存器SP下面的内存由于函数返回已经被释放掉,所以黄色指针指向的内存已不再合法。

逃逸分析是静态分析的一种,在编译器解析了Go语言源文件后,它可以获得整个程序的抽象语法树(Abstract syntax tree,AST),编译器可以根据抽象语法树分析静态的数据流,我们会通过以下几个步骤实现静态分析的全过程:
1.构建带权重的有向图,其中顶点cmd/compile/internal/gc.EscLocation表示被分配的变量,边cmd/compile/internal/gc.EscEdge表示变量之间的分配关系,权重表示寻址和取地址的次数。

2.遍历对象分配图并查找违反两条不变性的变量分配关系,如果堆上的变量指向了栈上的变量,那么栈上的变量就需要分配在堆上;

3.记录函数的调用参数到堆(可能参数对象中有一个指针指向函数中分配的对象,此时需要将该对象放到堆上)的数据流,以及函数返回值的数据流,增量函数参数的逃逸分析;

决定变量是在栈上还是堆上很重要,而且这是一个定义相对清晰的问题,我们可以通过编译器统一做出决策。为了保证内存的绝对安全,编译器可能会将一些变量错误地分配到堆上,但因为这些对象也会被垃圾收集器处理,所以不会造成内存泄漏、悬挂指针等安全问题,解放了工程师的生产力。

栈内存空间

Go使用用户态线程Goroutine作为执行上下文,它的额外开销和默认栈大小都比线程小很多,然而Goroutine的栈内存空间和栈结构也在早期几个版本中发生过一些变化:
1.v1.0~v1.1——最小栈内存空间为4KB;

2.v1.2——将最小栈内存提升到了8KB;

3.v1.3——使用连续栈替换之前版本的分段栈;

4.v1.4——将最小栈内存降低到了2KB;

Goroutine的初始栈内存在最初的几个版本中多次修改,从4KB提升到8KB是临时的解决方案,其目的是为了减轻分段栈的栈分裂问题对程序造成的性能影响;在v1.3版本引入连续栈后,Goroutine的初始栈大小降低到了2KB,进一步减少了Goroutine占用的内存空间。

分段栈

分段栈是Go在v1.3版本之前的实现,所有Goroutine在初始化时都会调用runtime·stackalloc#go1.2分配一块固定大小的内存空间,这块内存的大小由StackMin#go1.2表示,在v1.2版本中为8KB:

// 为Goroutine分配栈空间
void *runtime·stackalloc(uint32 n) {uint32 pos;void *v;// 如果请求的栈大小是固定大小 || 当前线程M正在进行内存分配 || 当前线程M正在进行垃圾回收if(n == FixedStack || m->mallocing || m->gcing) {// 如果栈缓存计数为0,表示缓存已空if(m->stackcachecnt == 0)// 重新填充栈缓存stackcacherefill();// 获取缓存位置指针pos = m->stackcachepos;// 计算新的缓存位置指针,通过取模操作实现循环缓存pos = (pos - 1) % StackCacheSize;// 从缓存位置取出栈指针v = m->stackcache[pos]// 更新缓存位置指针m->stackcachepos = pos// 递减缓存计数m->stackcachecnt--;// 递增正在使用的栈计数m->stackinuse++;// 返回成功分配的栈空间return v;}// 直接分配新的栈空间// FlagNoProfiling表示不进行栈分析// FlagNoGC表示不触发垃圾回收// FlagNoZero表示分配的内存不初始化为0// FlagNoInvokeGC表示不主动调用垃圾回收器return runtime·mallocgc(n, 0, FlagNoProfiling|FlagNoGC|FlagNoZero|FlagNoInvokeGC);
}

如果通过该方法申请的内存大小为固定的8KB或满足其他条件,运行时会在全局的栈缓存链表中找到空闲的内存块并作为新Goroutine的栈空间返回;其余情况下,栈空间会从堆上申请一块合适的内存。

当Goroutine调用的函数层级或需要的局部变量越来越多时,运行时会调用runtime.morestack#go1.2和runtime.newstack#go1.2创建一个新的栈空间,这些栈空间虽然不连续,但是当前Goroutine的多个栈空间会以链表形式串联起来,运行时会通过指针找到连续的栈片段:
在这里插入图片描述
一旦Goroutine申请的栈空间不再被需要,运行时会调用runtime.lessstack#go1.2runtime.oldstack#go1.2释放不再使用的内存空间。

分段栈机制虽然能够按需为当前Goroutine分配内存且及时减少内存占用,但它也存在两个比较大的问题:
1.如果当前Goroutine的栈几乎充满,那么任意的函数调用都将触发栈扩容,当函数返回后又会触发栈收缩,如果在一个循环中调用函数,栈的分配和释放就会造成巨大的额外开销,这称为热分裂问题(Hot split);

2.一旦Goroutine使用的内存越过了分段栈的扩缩容阈值,运行时就会触发栈的扩容和缩容,带来额外的工作量;

连续栈

连续栈可以解决分段栈中存在的两个问题,其核心原理就是每当程序的栈空间不足时,初始化一片更大的栈空间并将原栈中的所有值都迁移到新栈中,新的局部变量或函数调用就有了充足的内存空间。使用连续栈机制时,栈空间不足导致的扩容会经历以下几个步骤:
1.在内存空间中分配更大的栈内存空间;

2.将旧栈中的所有内存复制到新栈中;

3.将指向旧栈对应变量的指针重新指向新栈;

4.销毁并回收旧栈的内存空间;

在扩容过程中,最重要的是调整指针的第三步,这一步能保证指向栈的指针的正确性,因为栈中的所有变量内存都会发生变化,所以原本指向栈中变量的指针也需要调整。我们在前面提到过经过逃逸分析的Go语言程序遵循以下不变性——指向栈对象的指针不能存在于堆中,所以指向栈中变量的指针只能在栈上,我们只需要调整栈中的所有变量就可以保证内存安全了。
在这里插入图片描述
因为需要拷贝变量和调整指针,连续栈增加了栈扩容时的额外开销,但通过合理的栈缩容机制就能避免热分裂带来的性能问题,在GC期间如果Goroutine使用了栈内存的四分之一,那就将其栈内存减少一半,这样在栈内存几乎充满时也只会扩容一次(然后函数返回时不会缩容,只有GC期间会缩容),不会因为函数调用频繁扩缩容。

7.3.2 栈操作

Go语言中的执行栈由runtime.stack结构体表示,该结构体中只包含两个字段,分别表示栈的顶部和栈的底部,每个栈结构体都表示范围[lo, hi)的内存空间:

type stack struct {lo uintptrhi uintptr
}

栈的结构虽然简单,但想要理解Goroutine栈的实现原理,还是需要我们从编译期间和运行时两个阶段入手:
1.编译器会在编译阶段通过cmd/internal/obj/x86.stacksplit在调用函数前插入runtime.morestackruntime.morestack_noctxt函数;

2.运行时在创建新的Goroutine时会在runtime.malg函数中调用runtime.stackalloc申请新的栈内存,并在编译器插入的runtime.morestack中检查栈空间是否充足;

需要注意的是,Go编译器不会为所有函数插入runtime.morestack,它只会在必要时插入指令以减少运行时的额外开销,编译指令nosplit可以跳过栈溢出检查,虽然这能降低一些开销,但固定大小的栈也存在溢出风险。本节将分析栈的初始化、创建Goroutine时栈的分配、编译器和运行时协作完成的栈扩容、当栈空间利用率不足时的缩容过程。

栈初始化

栈空间在运行时中包含两个重要的全局变量,分别是runtime.stackpoolruntime.stackLarge,这两个变量分别表示全局的栈缓存和大栈缓存,前者可以分配小于32KB的内存,后者用来分配大于32KB的栈空间(那32KB的栈空间从哪取呢?):

var stackpool [_NumStackOrders]struct {item stackpoolItem_    [cpu.CacheLinePadSize - unsafe.Sizeof(stackpoolItem{})%cpu.CacheLinePadSize]byte
}type stackpoolItem struct {mu   mutexspan mSpanList
}var stackLarge struct {lock mutexfree [heapAddrBits - pageShift]mSpanList
}

这两个用于分配空间的全局变量都与内存管理单元runtime.mspan有关,我们可以认为Go的栈内存都是分配在堆上的,运行时初始化时调用的runtime.stackinit函数会初始化这些全局变量:

func stackinit() {for i := range stackpool {stackpool[i].item.span.init()}for i := range stackLarge.free {stackLarge.free[i].init()}
}

从调度器和内存分配的经验来看,如果运行时只使用全局变量来分配内存的话,势必会造成线程之间的锁竞争进而影响程序的执行效率,栈内存由于与线程关系比较密切,所以我们在每一个线程缓存runtime.mcache中都加入了栈缓存减少锁竞争影响。

type mcache struct {stackcache [_NumStackOrders]stackfreelist
}type stackfreelist struct {list gclinkptrsize uintptr
}

在这里插入图片描述
运行时使用全局的runtime.stackpool和线程缓存中的空闲链表分配32KB以下的栈内存,使用全局的runtime.stackLarge和堆内存分配32KB以上的栈内存,提高本地分配栈内存的性能。

栈分配

运行时会在Goroutine的初始化函数runtime.malg中调用runtime.stackalloc分配一个大小足够的栈内存空间,根据线程缓存和申请栈的大小,该函数会通过三种不同方法分配栈空间:
1.如果栈空间较小,使用全局栈缓存或线程缓存上固定大小的空闲链表分配内存;

2.如果栈空间较大,从全局的大栈缓存runtime.stackLarge中获取内存空间;

3.如果栈空间较大且runtime.stackLarge空间不足,在堆上申请一片大小足够的内存空间;

我们在这里会按照栈的大小分两部分介绍运行时对栈空间的分配。在Linux上,_FixedStack = 2048_NumStackOrders = 4_StackCacheSize = 32768,也就是如果申请的栈空间小于32KB时,我们会在全局栈缓存吃或线程的栈缓存中初始化内存:

func stackalloc(n uint32) stack {// 获取当前正在执行的Goroutine上下文thisg := getg()// 用于存储分配到的栈空间指针var v unsafe.Pointer// 如果n小于允许的最大栈大小 && n小于栈缓存的最大大小if n < _FixedStack<<_NumStackOrders && n < _StackCacheSize {order := uint8(0)n2 := n// 将n2每次除2,直到其值小于等于_FixedStackfor n2 > _FixedStack {// 除2的次数order++n2 >>= 1}var x gclinkptr// 获取当前M的mcachec := thisg.m.mcache// 如果禁用了栈缓存 || 当前M没有缓存 || 当前M处于不可抢占状态if stackNoCache != 0 || c == nil || thisg.m.preemptoff != "" {// 直接从全局的栈缓冲池中分配x = stackpoolalloc(order)// 从缓存中分配} else {// 从当前M的stackcache中获取对应order的栈缓存链表头x = c.stackcache[order].list// 如果链表为空if x.ptr() == nil {// 重新填充M本地缓存stackcacherefill(c, order)// 再次获取对应order的栈缓存链表x = c.stackcache[order].list}// 更新链表,去掉链表头c.stackcache[order].list = x.ptr().next// 减少剩余的缓存大小c.stackcache[order].size -= uintptr(n)}v = unsafe.Pointer(x)} else {...}...
}

runtime.stackpoolalloc函数会在全局的栈缓存池runtime.stackpool中获取新的内存,如果栈缓冲池中不包含剩余的内存,运行时会从堆上申请一片内存空间;如果线程缓存中包含足够的空间,我们可以从线程本地的缓存中获取内存,一旦发现空间不足就会调用runtime.stackcacherefill从堆上获取新的内存。

如果Goroutine申请的内存空间过大,运行时会查看runtime.stackLarge中是否有剩余空间,如果不存在剩余空间,它也会从堆上申请新的内存:

func stackalloc(n uint32) stack {...if n < _FixedStack<<_NumStackOrders && n < _StackCacheSize {...} else {var s *mspannpage := uintptr(n) >> _PageShiftlog2npage := stacklog2(npage)if !stackLarge.free[log2npage].isEmpty() {s = stackLarge.free[log2npage].firststackLarge.free[log2npage].remove(s)}if s == nil {s = mheap_.allocManual(npage, &memstats.stacks_inuse)osStackAlloc(s)s.slemsize = uintptr(n)}v = unsafe.Pointer(s.base())}return stack{uintptr(v), uintptr(v) + uintptr(n)}
}

需要注意的是,因为OpenBSD 6.4+对栈内存有特殊的需求,所以只要我们从堆上申请栈内存,就需要调用runtime.osStackAlloc函数做一些额外的处理,然而其他的操作系统就没有这种限制了。

栈扩容

编译器会在cmd/internal/obj/x86.stacksplit函数中为函数调用插入runtime.morestack运行时检查,它会在几乎所有的函数调用前检查当前Goroutine的栈内存是否充足,如果当前栈需要扩容,我们会保存一些栈的相关信息并调用runtime.newstack创建新的栈:

func newstack() {// 获取当前Goroutinethisg := getg()// 获取当前M正在运行的Goroutinegp := thisg.m.curg...// 如果需要抢占当前Goroutinepreempt := atomic.Loaduintptr(&gp.stackguard0) == stackPreemptif preempt {// 如果当前M无法被抢占(正在进行关键操作,如持有锁、内存分配等)// 抢占的目的是抢占用户代码,而非运行时代码的关键操作if !canPreemptM(thisg.m) {// 更新栈保护值,防止再次触发的抢占gp.stackguard0 = gp.stack.lo + _StackGuard// 恢复Goroutine在中断时的执行点,longjmpgogo(&gp.sched)}}// 获取调度器的栈指针sp := gp.sched.sp// 如果需要抢占当前Goroutineif preempt {// 如果需要缩减栈空间if gp.preemptShrink {// 重置标志位,表明已经被处理gp.preemptShrink = false// 执行栈缩减工作shrinkstack(gp)}// 如果需要停止当前协程if gp.preemptStop {// 重新挂起当前Goroutine,该函数不会返回,会将控制权转移给调度器preemptPark(gp)}// 就像Goroutine调用了runtime.Gosched,让出处理器,被抢占成功gppreempt_m(gp)}...
}

runtime.newstack会先做一些准备工作并检查当前Goroutine是否发出了抢占请求,如果发出了抢占请求:
1.当前线程可以被抢占时,调用runtime.gogo触发调度器的调度(然而根据以上代码,这一条是错误的,当M不能被抢占时,才上调栈扩容的保护值,然后触发调度,此处的调度指的是将Goroutine longjmp回参数保存的状态);

2.如果当前Goroutine在垃圾回收时被runtime.scanstack函数标记成了需要收缩栈,调用runtime.shrinkstack

3.如果当前Goroutine被runtime.suspendG函数挂起,调用runtime.preemptPark被动让出当前处理器的控制权并将Goroutine的状态修改至_Gpreempted

4.调用runtime.gopreempt_m主动让出当前处理器的控制权;

如果当前Goroutine不需要被抢占,意味着我们需要新的栈空间来支持函数调用和本地变量的初始化,运行时会先检查目标大小的栈是否会溢出:

func newstack() {...// 计算当前Goroutine栈大小oldsize := gp.stack.hi - gp.stack.lo// 新栈大小为旧栈的2倍,常见策略,避免频繁的小幅度扩展newsize := oldsize * 2// 如果新栈大小超过最大栈大小if newsize > maxstacksize {print("runtime: goroutine stack exceeds ", maxstacksize, "-byte limit\n")print("runtime: sp=", hex(sp), " stack=[", hex(gp.stack.lo), ", ", hex(gp.stack.hi), "]\n")throw("stack overflow")}// 将Goroutine状态从_Grunning改为_Gcopystackcasgstatus(gp, _Grunning, _Gcopystack)// 复制协程栈到一个更大的新栈copystack(gp, newsize)// 将Goroutine状态从_Gcopystack改为_Grunningcasgstatus(gp, _Gcopystack, _Grunning)// 恢复Goroutine运行gogo(&gp.sched)
}

如果目标栈的大小没有超出程序限制,我们会将Goroutine切换至_Gcopystack状态并调用runtime.copystack开始栈的拷贝,在拷贝栈的内存之前,运行时会通过runtime.stackalloc函数分配新的栈空间:

func copystack(gp *g, newsize uintptr) {// 保存原栈的使用信息old := gp.stack// 计算原栈实际使用的大小used := old.hi - gp.sched.sp// 分配新栈new := stackalloc(uint32(newsize))...
}

新栈的初始化和数据的复制是一个比较简单的过程,这不是整个过程中最复杂的地方,我们还需要将指向原栈中内存的指针指向新栈,在这期间我们需要调整以下指针:
1.调用runtime.adjustsudogsruntime.syncadjustsudogs调整runtime.sudog结构体的指针;

2.调用runtime.memmove将原栈中的整片内存拷贝到新栈中;

3.调用runtime.adjustctxtruntime.adjustdefersruntime.adjustpanics调整剩余Goroutine相关数据结构的指针:

func copystack(gp *g, newsize uintptr) {...// 记录栈调整相关信息var adjinfo adjustinfoadjinfo.old = oldadjinfo.delta = new.hi - old.hi // 计算新栈和旧栈之间内存地址差// 需要复制的数据量ncopy := used// 如果没有活跃的栈上channelif !gp.activeStackChans {// 直接调整sudogs(等待在channel上的所有Goroutine),使其中的sudog指向新栈的正确位置adjustsudogs(gp, &adjinfo)// 有活跃channel,说明其他Goroutine正通过channel向原栈写数据} else {// 找到所有sudog中指向原栈的最高地址// 复制比这个地址更高的部分是安全的,因为这部分未被channel操作影响adjinfo.sghi = findsghi(gp, old)// syncadjustsudogs函数同步地进行栈内容拷贝和sudog的调整,它已经返回安全处理的栈大小// 更新ncopy,只用复制栈最开始的高位部分ncopy -= syncadjustsudogs(gp, used, &adjinfo)}// 从原栈拷贝内容到新栈memmove(unsafe.Pointer(new.hi-ncopy), unsafe.Pointer(old.hi-ncopy), ncopy)// 调整指向Goroutine栈的其他结构体,包括上下文、延迟调用链表、panic链表adjustctxt(gp, &adjinfo)adjustdefers(gp, &adjinfo)adjustpanics(gp, &adjinfo)// 更新栈信息gp.stack = newgp.stackguard0 = new.lo + _StackGuardgp.sched.sp = new.hi - usedgp.stktopsp += adjinfo.delta...// 释放旧栈内存stackfree(old)
}

调整指向栈内存的指针都会调用runtime.adjustpointer,该函数会利用runtime.adjustinfo计算的新栈和旧栈之间的内存地址差来调整指针。所有指针都被调整后,我们就可以更新Goroutine的几个变量并通过runtime.stackfree释放原始栈的内存空间了。

栈缩容

runtime.shrinkstack是用于栈缩容的函数,该函数的实现原理非常简单,其中大部分都是检查是否满足缩容前置条件的代码,核心逻辑只有以下几行:

func shrinkstack(gp *g) {...// 计算原栈大小oldsize := gp.stack.hi - gp.stack.lo// 新栈大小为原栈大小的一半newsize := oldsize / 2// 如果新栈大小过小,则不进行缩容if newsize < _FixedStack {return}// 计算栈可用大小avail := gp.stack.hi - gp.stack.lo// 如果已用大小超过栈大小的1/4,则不进行缩容// 已用大小会加上一个安全边界,为的是留出空间给nosplit的函数if used := gp.stack.hi - gp.sched.sp + _StackLimit; used >= avail/4 {return}// 执行栈缩容copystack(gp, newsize)
}

如果要出发占的缩容,新栈的大小会是原始栈的一半,不过如果新栈的大小低于最低限制2KB,那么缩容过程就会停止。
在这里插入图片描述
运行时只会在栈内存使用不足1/4时进行缩容,缩容也会调用扩容时使用的runtime.copystack函数开辟新的栈空间。

7.3.3 小结

栈内存是应用程序中重要的内存空间,它能够支持本地的局部变量和函数调用,栈空间中的变量会与栈一同创建和销毁,这部分内存空间不需要工程师过多地干预和管理,现代的编程语言通过逃逸分析减少了我们的工作量,理解栈空间的分配对于理解Go语言的运行时有很大帮助。

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com