您的位置:首页 > 新闻 > 热点要闻 > 建筑工程招聘最新信息平台_网站收费吗_企业为何选择网站推广外包?_排行榜前十名

建筑工程招聘最新信息平台_网站收费吗_企业为何选择网站推广外包?_排行榜前十名

2025/1/5 7:55:36 来源:https://blog.csdn.net/qq_35263706/article/details/144235878  浏览:    关键词:建筑工程招聘最新信息平台_网站收费吗_企业为何选择网站推广外包?_排行榜前十名
建筑工程招聘最新信息平台_网站收费吗_企业为何选择网站推广外包?_排行榜前十名

笔记1:读操作包括两种,readIndex和serilizable,readIndex指一致性读,一旦a读到了数据x,那么a及a以后的数据都能读到x,readIndex读会先确认本leader是不是有效地leader,如果有效则记录此刻的commiteIndex作为confirmIndex,等到applyIndex>confirmIndex时就可以进行serilizable读了,而serilizableRead就是副本读,直接读leader的数据。

笔记2:a、b、c三个readIndex读请求先后到达etcd,他们对应的confirmIndex分别为xa,xb,xc,且xa<=xb<=xc,applyIndex>xc时,会通过close(ch)来一次性唤醒三个a、b、c这三个请求,但是使用close(chan)的方式唤醒时,这三个请求被唤醒时的顺序是随机的,close(ch)只是一个唤醒操作,唤醒以后读请求开始执行,最终会来到range,range函数开始执行的时候会先把请求包装到一个读事务中,这个读事务会保存读事务创建时etcdserver的currentRev即etcdserver此刻的最大版本号,也就是说readIndex是读最新的数据,虽然请求对应的confirmIndex有大小有先后,但是这只是一个要求,一旦满足要求就读最新的数据是最新的数据,读事务采用的是concurrent模式,也就是加读锁-复制-解读锁-执行读取,也就是采用读写间隔模式,假设c先唤醒,然后c加锁,然后读到的版本号为k,然后解读锁,然后正好此时另一个goroute加写锁提交解写锁,因为版本号是递增的,所以此时etcd的最新版本号肯定大于k的,假设为k+z,然后a请求开始执行,加读锁-读取版本号为k+z,然后解读锁,假设c读取key=k1这个key,然后a也读取key=k1这个key,版本号k+z必定大于k,可以肯定的是如果c读到了版本号为k的数据,那么在他后面的a也必定能读到版本k的数据如果有更新的数据,那么就是读更新的数据,即一个数据已经读到,那么他后面的所有请求都能读到这个数据后者这个数据的更新的值

!!!serilizableRead最终也是通过事务来实现的,也就是说etcd不管是读还是写最终都是通过事务来执行

etcd raft range:etcd不管是读还是写最终都是通过事务来执行,如果没有事务,就会包装一个事务,都分为两步,第一步是raft流程,第二步是执行事务,raft流程中如果是写操作则需要写同步日志,而如果是readIndex读则不是同步日志,而是同步状态,第二步执行事务时,如果是读就封装读事务,如果是写就是写事务


v3_server.EtcdServer.Range                       #就两步:1:先阻塞,直到其他线程通知他可以读;2:去数据库读取最新数据if !serilizable{                               #serilizable表示直接读leader,!serilizable表示ReadIndex即线性一致性读#ReadIndex读就是先等待,然后直到满足readIndex读条件之后再进行serilizable读#具体步骤就是:1:就是leader首先确认自己此刻是不是leader,#因为有可能网络分区等原因导致leader实际不是leader#2:如果是,那么当前leader此刻的commitedIndex就是此读请求对应的ReadIndex#当本节点的 applyIndex>=readIndex时这个readIndex读请求就可以执行serilizable读了v3_server.EtcdServer.linearizableReadNotify  #执行等待,直到appliedIndex>=ReadIndex后才去数据库读数据#这个函数的逻辑很简单:#写readWaitc这个chan来通知另一个goroute有人要readIndex读#通知完后本线程就等待readNotifier这个chan即等待其他goroute通知#当另一个goroute准备好后就会写readNotifier这个chan来唤醒当前请求#goroute是很轻量的,所以直接阻塞就行,不停的来请求,不停的创新的goroute就行nc := s.readNotifier                       #获取通知chan#当readIndex准备好的时候其他goroute就会写这个notifyChan来通知本gorouteselect:case  s.readwaitc <- struct{}{}          #发消息到readwatic chan 来通知linearizableReadLoop函数有人需要ReadIndex#readWaitC的容量大小为1deafult:                                 #如果readWaitc满了,导致写失败,那么就直接跳过此步,继续往下走#也就是说写readWaitC是一个非阻塞操作,不管有没有写成功,都会继续往下走select  case <-nc.c:                             #通知完readWaitc后当前goroute就阻塞在这个readNotifier直到被通知---------------------------------------->another thread 1:                          #当过半节点都承认leader节点有效地时候,#thread1 会通过写chan来通知上层程序leader当前是有效的,即leader的commitedIndex是有效地#后续上层程序只需要等待appliedIndex>=confirmIndex即可#confirmIndex即请求到来时leader的commitedIndexetcdserver.EtcdServer.linearizableReadLoop #一个死循环,获取此刻最新的commitedIndex,通过chan接受上层发来的ReadIndex请求,#然后也通过chan把处理结果返回给发请求的线程for:                                    #就是一个死循环,监听某个chan,如果收到上层发来请求就唤醒,#然后创建一个chan,发给其他线程,然后等待其他线程发回结果idutil.Generator.Next                 #为本轮即将到来的请求先提前生成一个唯一reqid,后续用来检索select:case <-leaderChangedNotifier:       #如果等待期间leader变了 continue                          #则放弃本轮循环,直接开始新的一轮循环case <-s.readwaitc:                 #从readWaitc收到其他线程发来的ReadIndex的请求。一次for循环处理一个读请求  case <-s.stopping:                  #如果etcdserver停止了,那么退出循环结束returnnextnr := newNotifier()               #创建一个新的notifys.readMu.Lock()          nr := s.readNotifier                  #获取旧的notifys.readNotifier = nextnr               #用新的notify替换掉旧的notify,#本轮我们会处理旧notify,并且新请求会挂在新notify下面s.readMu.Unlock()                     #!!!这里的逻辑是这样的:#!!!我们用a代表linearizableReadNotify,b代表linearizableReadLoop#!!!用rc代表readWaitC,用nc代表notifier#!!!所以流程就简化为a收到客户端发来的请求req,然后写rc通知b,然后在nc上阻塞#!!!然后b收到通知后就处理请求,处理完后唤醒nc上等待的所有a(一个req对应一个a)#!!!因为b是串行处理即从rc取一个请求然后处理,处理完后才会从rc读下一个请求#!!!而且a可能同时收到很多请求,假设有req1,req2,...,reqK,...,reqN#!!!再加上rc的容量是1,所以会导致这样一种情况:#!!!a收到reqK,然后发给b,b从rc取出reqK,然后a写reqK+1到rc会成功,#!!!b然后处理reqK,此时a仍然不停的收到新的请求#!!!这样就会导致reqK+1之后的请求都会写rc失败,但是a即使写rc失败#!!!也会阻塞在nc上,当a收到请求reqN时,此时b刚好处理完,开始下一次循环#!!!b会从rc读取reqK+1,然后rc空了,a就会写reqN成功#!!!当b处理完reqK+1的时候下一个处理的请求就是reqN了#!!!也就是说当b处理完reqN的时候不能仅仅通知reqN对应的请求#!!!还必须唤醒所有在reqK+2到reqN之间的所有阻塞在nc上的请求#!!!所以解决思路就是:用n1、n2两个nc,并且交替处理。#!!!就是说处理n1的时候,所有后续到来的请求都阻塞在n2上#!!!然后n1处理完后处理n2,然后在处理n2的时候,就让后续到来的请求都阻塞在n1上#!!!这样我们处理完n1的时候直接n1.notifyAll就能唤醒所有之前阻塞在n1上的请求#!!!同理:处理完n2的时候直接n2.notifyAll就能唤醒所有之前阻塞在n2上的请求                                    #!!!这个解决思路底层依赖于这样一个原理:#!!!一旦readIndex读请求reqN处理完了即达到可进行readIndex读的时候#!!!reqN之前的所有readIndex读请求一定都满足readIndex读的条件#!!!具体点说就是后到来的请求y的readIndex必定大于在他之前的请求x的readIndex#!!!即当applyIndex>=readIndex_y的时候必定有applyIndex>=readIndex_xv3_server.EtcdServer.requestCurrentIndex      #获取此刻commitedIndex的值并保存到一个叫confirmIndex的变量中#注意,一个notifer下面会挂一大串请求,但是他这里只需要请求一次就行#因为请求等待的readIndex不会大于此刻的commitedIndex#所以当applied>commitedIndex时表示所有readIndex<commitedIndex的#读请求的一致性要求都可以满足#从而他会一次性唤醒所有readIndex<此刻commmitedIndex的读请求etcdserver.EtcdServer.sendReadIndex         #获取最新commitedIndex,会同步等待,直到ok或者出错raft.node.ReadIndex                       #通过向raft状态机发送一个MsgReadIndex消息来获取#丢完消息后就返回,让他异步去处理raft.node.step(pb.MsgReadIndex,reqid)   #构造一个MsgReadIndex消息#前面生成的reqid作为数据部分放在消息的data字段中,然后处理raft.node.stepWithWaitOptioncase n.recvc <- m                   #把前面构造的的MsgReadIndex消息发到recvc然后此处就返回了#让其他goroute异步走一遍stepLeader或者stepFollower(根据节点角色决定)#这个recvc就是专门收发其他节点发来的消息,当然也可以自己发给自己#读取实践中有三种方式:1:Log(每次读也写一条日志)#2:readIndex:就录一个commitedIndex,#直到appliedIndex>=记录的commitedIndex#3:直接从本地读,不经过leader#log方式太慢了;readIndex还是需要一轮广播;直接本地读,不安全for:                                        #这是一个死循环,其他线程处理完MsgReadIndex消息后#会通过填充readStateC chan来解除死循环#这个for循环是上面那个requestCurrentIndex函数里的#requestCurrentIndex函数会阻塞,#直到node.run中把readState中的chan发给他来唤醒它select case rs := <-s.r.readStateC             #阻塞在readStateC上,其他线程处理完MsgReadIndex消息后,#其他线程会把处理结果写到这个readStateC中来跳出循环#linearizableReadNotify这个死循环有三种结束等待的方式:#1:超时或者error结束等待;2:readStateC;3:notifier#一个notifier对象可能对应1批ReadIndex请求,#只要这一批有一个请求完成了,#那么他完成时会通知本批次所有请求都结束等待return rs.ReadIndex                   #到达这里说明该请求已经被批准了,此处返回结果  case <-firstCommitInTermNotifier        #收到了当前任期第一次提交发来的通知。#即当客户端发来ReadIndex的时候本leader才刚获得leader资格#在他的这个任期内集群还没有发生过commited事件,#所以必须等待,假设旧leader提交到了x+3然后崩溃#然后新leader当选,因为此时集群变了比如旧leader崩溃了,#导致没有过半节点到达x+3,#那么新leader就不能从x+3开始提交#需要重新确定commited,这是一个不断尝试的过程,#也就是说这是一个不断变化的过程,#所以在新leader确定commited之前不能读取,#所以新leader第一次提交之前到来的请求#都需要在新leader第一次提交之后重新尝试#即重新丢一个MsgReadIndex到raft重做一遍etcdserver.EtcdServer.sendReadIndex time.Timer.Reset                      #重置定时器case <-retryTimer.C:etcdserver.EtcdServer.sendReadIndex time.Timer.Reset case <-leaderChangedNotifier            #如果leader变了,则放弃所有读请求,并返回错误returncase <-errorTimer.C                     #超时,返回错误return ....后面的流程此处略,后面再补充....  ---------------------------------------->another thread 2:                   #node.recvc收到etcdserver发来的ReadIndex请求即发来的MsgReadIdnex消息raft.node.runfor:case m := <-n.recvc           #etcdserver发来的MsgReadIndex消息raft.raft.Step//if role==leader:          #如果当前节点角色是leader则走stepLeaderraft.stepLeadercase pb.MsgReadIndex    #处理思想就是:走一遍heartbeat流程。#如果heartbeat流程中有过半节点拥护当前节点,那么当前节点就是有效地leader#那么此leader当前的commitedIndex就是此请求对应的ReadIndex#即后面说的变量confirmIndex#对MsgReadIndex消息的处理流程如下:#1:用一个map acks保存所有节点对该ReadIndex的投票情况,map的key是节点id#2:发送MsgHeartbeat消息给所有节点#3:收到一个MsgHeartbeatResp时不但要标记该节点x是活跃的,#还要同时令acks[x]=true即认为该节点是赞同当前leader和ReadIndex的if !raft.raft.committedEntryInCurrentTerm   #如果当前leader在任期内还没有提交过日志,#那么就直接挂起这个ReadIndex,然后直接返回#因为在处理完一个ReadIndex时会同时唤醒所有index#在他之前的所有ReadIndex请求,#所以这里可以安心挂起,因为后续的ReadIndex会唤醒它#本文后面会解释append(r.pendingReadIndexMessages)        #挂起即把请求放到一个pending数组,然后直接返回,不管这个请求了return raft.sendMsgReadIndexResponse               #发送heartbeat消息给所有peer节点#两步:1:leader自己给自己投一票;2:发消息给followercase ReadOnlySafe:                        #ReadOnlySafe表示ReadIndex读#safe的含义是走一遍quorum投票,只有过半节点赞成才会执行raft.readOnly.addRequest(r.raftLog.committed, m) #保存此刻的commitedIndex以及本次请求#!!!此刻的commitedIndex就是本次请求对应的readIndex#MsgReadIndex消息的数据字段包含了本次ReadIndex的reqid#当ReadIndex处理完毕后那么保存的这个commitedIndex值#就是confirmIndexro.pendingReadIndex[s] = &readIndexStatus{index: index, req: m, acks: make(map[uint64]bool)}#这里就是把req以及对应的投票信息保存到一个map中#key=reqid,value=投票信息ro.readIndexQueue = append(ro.readIndexQueue, s) #readIndexQueue保存了所有readindex读请求#并且请求是按顺序保存的,所以当x满足readIndex条件时#队列中所有在x之前的请求必定满足readIndex条件raft.readOnly.recvAck(r.id, m.Entries[0].Data)   #消息的Data字段实际就是ReadIndex对应的reqid,#r就代表本节点,这里就是当前节点默认是投自己一票#!!!只有leader才会走到这里,follower会直接丢给leaderro:=pendingReadIndex[reqid]           #获取reqid对应的投票信息ro.acks[id]=true                      #对于reqid对应的这个ReadIndex,leader肯定是表示支持的#只有活跃节点才会放到这个map acks中,#如果后续检测到这个map中有过半节点数#那么就认为reqid对应的ReadIndex被批准了,#就可以通过chan来通知上层可以去数据库读数据了r.bcastHeartbeatWithCtx                 #广播heartbeat消息给所有peer节点                                    #follower节点对heartbeat消息的处理很简单,#就简单返回本身的commitedIndex给leadercase ReadOnlyLeaseBased                   #ReadOnlyLeaseBased表示LeaseRead即副本读,即采用了租约#当一个领导者被选举出来时,它会获得一个租约,#这个租约保证了在一定的时间窗口内,#集群可以基于这个领导者提供一致性视图来处理只读请求#但是leader可能并不是真的leader,所以租约读是不安全的//if role==follower:                                   #如果当前节点是follower,则直接把请求丢给leaderstepFollowerswitch:case pb.MsgReadIndex:m.To = r.leadr.send(m)  ---------------------------------------->another thread 3:                               #thread 3处理MsgHeartbetaResp消息,即follower发回来的响应#如果有过半节点承认reqid对应的ReadIndex就通知上层当前leader是有效的,#reqid对应的ReadIndex请求可以结束等待了raft.node.runfor:case m := <-n.recvc                       #peer节点发来的MsgHeartBeatRespraft.raft.Stepraft.stepLeadercase pb.MsgHeartbeatRes             #对MsgHeartBeatResp的处理主要包括三步:#1:标记该节点x是近期活跃的#2:标记reqid对应的acks[x]=true#3:如果过半,则写chan来通知上层可以结束对该reqid对应的ReadIndex的等待了progress.RecentActive=true        #1:标记该节点是近期活跃if pr.Match < r.raftLog.lastIndex #follower节点会把自己的commitedIndex告知leader节点,#此处发现follower节点落后了,所以发送MsgApp通知他追赶  raft.raft.sendAppendraft.readOnly.recvAck(perrId,reqid) #2:标记reqid.acks[perrId]=true即该peer节点支持leaderquorum.JointConfig.VoteResult       #3:计算投票结果,即看reqid对应的acks map中是否有过半数节点#前面说过,acks map保存的是所有活跃的且承认当前节点是leader的节点#如果有则通知上层linearizableReadLoop reqid对应的读请求可以结束等待了#就是一个count,看是否过半rss=raft.readOnly.advance(m.Index)#从等待队列移除所有index在m.Index之前的所有pendingRequest,#会把满足要求的pendingRequest放到一个叫做rssd的数组中#我们前面把reqid和一个commitedIndex(假设值为x)绑定在一起#当x可以结束等待时,那些commitedIndex小于x的ReadIndex请求肯定可以结束等待了raft.raft.responseToReadIndexReq #根据rss中的请求构造MsgReadIndexResp消息,#这个MsgReadIndexResp消息中包含了reqid对应的ReadIndex值#即当时的commitedIndex值#如果消息来源是follower,则把MsgReadIndexResp消息发给follower,#follower再把该req放到readState中(readState用来保存当前已经批准的的读请求)#然后会把readState中的元素发到指定的r.readStateC,#linearizableReadLoop 每次循环就是在等待这个readStateC#我们通过前面的步骤已经确定了该req对应的读请求所等待的commitedIndex值,#因为客户端如果请求的是follower节点,follower节点会把请求转发给follower,#leader会把批准的ReadIndex值放到这个MsgReadIndexResp中(假设用变量confirmIndex表示),#这样后续当follower节点发现本机appliedIndex>=confirmIndex时#就可以遍历readState中的所有读请求,#凡是req.confirmIndex<=appliedIndex的读请求都可以解除阻塞#如果消息来源是leader自己,一样的把他加到leader节点自己的readState数组#在node.run在下一次循环中会检测到readState不为空,然后就触发case rd<-r.Ready()if req.From == None || req.From == r.id {        #如果是自己发给自己的r.readStates = append(r.readStates, ReadState{ #直接把ReadState丢到leader自己的readStates中#后续会把readStates里的数据丢到readStatesCIndex:      readIndex,RequestCtx: req.Entries[0].Data,})return pb.Message{}}return pb.Message{                               #如果是follower发来的ReadIndex读请求Type:    pb.MsgReadIndexResp,                  #那么就返回MsgReadIndexResp消息给followerTo:      req.From,Index:   readIndex,                            #readIndex是req创建时的leader的commitedIndexEntries: req.Entries,}if req.to!=None:raft.raft.send(pb.MsgReadIndexResp)              #把MsgReadIndexResp响应消息返回给follower---------------------------------------->another thread 4:                           #上面已经把批准的ReadIndex请求放到readState了,#然后readState不为空会被node Ready()检测到#然后raftnode.run会把readState最后一个元素发到指定的r.readStateC以激活下一步#在他之前的必定满足要求,所以发送最后一个就行了raft.raftNode.runfor:selectcase rd := <-r.Ready()    if len(rd.ReadStates) != 0 {   case r.readStateC <- rd.ReadStates[len(rd.ReadStates)-1] #发送最新的readState到指定chan,激活相关线程,#因为他是不断循环的,只要readState不为空,那么就会继续ready,#继续处理,直到为空---------------------------------------->//这里又回到another thread 1 中的etcdserver.EtcdServer.linearizableReadLoop函数etcdserver.EtcdServer.linearizableReadLoopetcdserver.EtcdServer.linearizableReadLoop   ......                                        #当requestCurrentIndex返回后,就可以获取此刻得appliedIndexetcdserver.EtcdServer.getAppliedIndex         #获取当前的appliedIndexif appliedIndex<confirmIndex                  #如果还没有apply到confirmIndex#即读请求到来时的有效commitedIndex值就继续等待case <- wait.timelist.Wait(confirmIndex)    #继续阻塞,直到etcdserver.EtcdServer.applyAll线程#在完成一此apply操作后主动唤醒所有appliedIndex之前的读请求#这个Wait会创建一个chan,#applyAll唤醒它时调用close(ch)来填充这个chan,来结束阻塞etcdserver.notifier.notify                    #当属于同一个notifier的一批请求中的某个被批准的时候#会唤醒所有在等待这个oldnotifier的读请求#即会唤醒linearizableReadNotify goroute,#一个请求对应一个linearizableReadNotify goroute#所以这里一次就会唤醒很多歌gorouteclose(oldnotifier chan)                       #close(chan)会唤醒所有等待这个chan的线程---------------------------------------->//这里就回到linearizableReadNotify v3_server.EtcdServer.Range  etcdserver.EtcdServer.doSerialize                  #serializeRead指直接从从bbolt数据库读取数据#而ReadIndex读则相当于在serializeRead之前增加了一个wait操作,#直到appliedIndex>=commitedIndex#线性读要求读最新数据,这里就直接去数据库读了,#doSerialize就是LeaseRead,#当ReadIndex读请求被放行以后就执行LeaseRead#这个serialize就是调用txn.Range来读取#即serialize是一个读事务apply.applierV3backend.Rangeif txn == nil:                                 #如果事务为空txn = a.s.kv.Read(mvcc.ConcurrentReadTxMode) #则创建对应的conncurentReadTx事务对象......backend.readTx.buf.unsafeCopy            #conncurrentReadTx会复制一份readTx的readBuf#因为applierV3backend.Range会在txn中调用,即op= "Range"#也可能直接调用,所以txn可能为空也可能不为空...rev=s.currentRev                          #本次读事务看到的版本号就是此刻etcd最新的版本号defer txn.End()metrics_txn.metricsTxnWrite.Range              #doSerialize就是LeaseRead,当ReadIndex读请求被放行以后就执行LeaseRead#这个Lease Read就是调用txn.Range来读取kvstore_txn.storeTxnRead.Rangekvstore_txn.storeTxnRead.rangeKeys(tr.Rev())#读取revision版本不超过tr.Rev的key即只能读到事务开始前就完成的key#笔记:apply之后只是把writeTxn请求丢给底层的batchTxBufferd就返回了#然后readbuf此时是没有这些新数据的,也就是此时读还是只能看到旧版本的数据#当batchTxBuffered提交以后就会更新readBuf并更新s.currentRev#此后的事务就能看到最新的版本号以及从readBuf读到最新的数据了revPairs=index.treeIndex.Revisions(key,end,atRev) #从treeIndex获取对应的revision#key表示要查找的key范围的起点,end表示key范围的终点,#atRev表示版本号,即不会读取atRev之后的版本的数据#举个例子:#key为a的数据有修改了三次,对应三个版本号:rev1=3 rev2=6 rev3=10#假设事务开始时的rev=8即atRev=8#那么就只会读取该key的rev小于8的最新的数据即rev=6if end==nil:                           #end=nil表示本次只读取一个keyindex.treeIndex.Get(key,atRev)  keyi := &keyIndex{key: key}index.treeIndex.keyIndex(keyi)     #从treeIndex中获取key对应的keyIndex结构,treeIndex是一棵b树#etcd所有key的keyIndex都会放在内存,这也限制了etcd支持的数据集大小key_index.keyIndex.Get             #从key对应的keyIndex中获取数据对应的revkey_index.keyIndex.findGeneration#先从keyIndex中获取generationkey_index.keyIndex.walk          #一个generation中可能多次修改key,所以generation可能含有多个版本号#这里就是选一个不超过且最靠近atRev的版本号,包括rev包括(main,sub)#获取了(main,sub)后就唯一确定了bbolt中的一个数据for   revPairs:                          #遍历获取的所有rev对即(main,sub)对revision.revToBytes(revpair, revBytes) #把(main,sub)转换成key,即bbolt也是kv,只不过key=main_subbinary.BigEndian.PutUint64(bytes, uint64(rev.main))bytes[8] = '_'binary.BigEndian.PutUint64(bytes[9:], uint64(rev.sub))readTx.baseReadTx.UnsafeRange          #读取操作很简单:1:先尝试从readBUf读tx_buffer.txReadBuffer.Range         #这里就是baseReadTx.buf.Range即从readBuf读取if int64(len(keys)) == limit:        #如果从readBuf读到了指定数量的数据,那就直接返回,否则就要去读bboltreturn keys, valsbatch_tx.unsafeRange                 #readBuf中没有再去bbolt读取#笔记:读过的数据不会丢到readBuf中,readBuf中保存的是最新commit的数据

stepFollower对leader发来的heartbeat消息的处理:

stepFollower:case pb.MsgHeartbeat:                                                  #leader发来msgHeartbeat消息mr.lead = m.From                                                      #m.From表示leaderraft.raft.handleHeartbeat                              log.raftLog.commitTo(m.Commit)                                     #follower尝试把自己的commitIndex提升到m.Commit#m.Commit表示leader此刻的commitIndexif l.committed < tocommit {                                      #如果follower的commitIndex a <leader的commitIndex bif l.lastIndex() < tocommit {                                  #并且如果follower的最大日志索引还没有到达bl.logger.Panicf(                                             #则说明follower出了问题,可能是落后了  "tocommit(%d) is out of range [lastIndex(%d)].           #这里打印日志,然后直接杀掉followerWas the raft log corrupted, truncated, or lost?",        #估计会有recover重启follower吧tocommit, l.lastIndex())}l.committed = tocommit                                         #否则把follower.commitIndex设置为leader的commitIndex b}raft.raft.send(pb.Message{                                         #发回对leaderHeartbeat的响应To: m.From, Type: pb.MsgHeartbeatResp, Context: m.Context})case pb.MsgReadIndex:                                                #如果当前节点是follower并且收到ReadIndex读请求                r.send(m)                                                          #那么就直接把请求转发给leadercase pb.MsgReadIndexResp:                                            #如果收到leader发来的对readIndex读请求的响应#注意:leader会等待readIndex读请求条件满足时#才会发送响应MsgReadIndexResp给followerr.readStates = append(r.readStates,                                #follower收到后就把对应的数据丢到readStates中ReadState{                                   #然后就会激活follower上阻塞的readIndex读请求Index: m.Index,                            #后面就是serilizable读了RequestCtx: m.Entries[0].Data})

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com