笔记:v3.5和v3.6还是有许多不同的,但大致流程还是一样的
!!!本文的a.b.c表示a文件里的b类的方法c,注意a不一定是包名,因为文件名不一定等于包名
go可以成千上万的goroute并发,所以每个请求都是同步请求即propose,然后select,直到超时或者propose成功
etcdServer.put->s.raftNode.node.stepwait->n.proc(这是chan)->node.run->(case <-proc)->node.rawNode.raft.Step->raft.setp.case_default->raft.step(前面是Step,这里是内部的step)->case_propose->raft.appendEntry+raft.bcastetcdserver收到请求,请求需要先propose,然后etcdserver则把请求转发给node要求node完成propose任务,node是一个状态机,负责决策,根据请求的当前状态来决定下一步干什么,node只负责角色,具体操作由raft完成,也就是说raft提供了各种能力即负责执行,但是raft什么时候执行哪个操作则由node根据请求的当前状态来决定,换个角度理解,etcdserver看到的raft 节点是node,而raft协议层面看到的节点则是raft。
存疑:unstable、memstore、snapshot三者lastindex的关系
梳理一下batchtx、writebuf、readbuf、bbolt之间的关系
batchtx内部保存了一个bbolt对象,因为etcd是读操作可以从readbuf中读取不用每次都从磁盘查(etcd是所有数据的索引都会放在内存,但是数据不一定可能在磁盘),从而加快读取,put时会先把数据写入bbolt,但是此时是没有提交的,然后写入bbolt之后再写入writebuf,因为batchtx是批量提交,也就是攒够1w条后会进行提交(肯定有异步goroute会定时提交),提交的时候则会调用内部bbolt.tx.commit来提交bbolt事务,事务提交以后在会调用txn.End,在batchTxn的End函数里会把writebuf里面的内容写到readbuf,然后清空writebuf,也就是说etcd是先从readbuf取数据,取不到再去读磁盘,batchtx把数据写到bbolt之后但是没有提交之前,对外不可见,所以这些数据就不会写到readbuf中,当攒够一批事务或者定时提交时间到了,batchtx调用bbolt.commit来提交事务,提交事务后会把提交的数据丢到readbuf,因为此时会写readbuf,所以需要对readbuf加锁,因为batchtx是批量提交,每次提交的事务会很多,所以不应该提交一个事务就修改一次readbuf,因为这要求对readbuf大量加解锁,效率就低了,所以就用了一个writebuf,先把所有提交的数据写道writebuf,等事务结束的时候一次性写到readbuf中,然后再释放readbuf
a.b.c表示a文件里的b类的方法c,注意a不一定是包名,因为文件名不一定等于包名
put源码流程:
------->1: etcdserver put
etcdserver.EtcdServer.Put #客户端调用Put grpc会走到这里,#客户端都是通过调grpc来和服务器通信,不同的请求对应不同的rpc函数#因为put会修改集群数据,所以写入数据库之前必须先走一遍raft流程来同步日志etcdserver.EtcdServer.raftRequest(pb.InternalRaftRequest{Put: r})#把普通请求封装到raft请求中#raftRequest的意思表示etcdserver要使用raft方式完成此请求#put操作要求propose成功并且被apply之后才能返回ok给client#此处raftRequest只是第一步,put会直到数据异步apply成功后才会返回#这个Put:r表示把当前请求放到raft请求的Put字段,#这个put字段属于raft请求的数据字段(data)#当然,还有Delete/Range/Txn/Compact字段,#但是同一个请求有且只有一个字段不为空即一个请求只做一件事#raft同步日志过程中,#只关心日志的元信息比如日志索引、任期,不关心不使用这个数据字段#这个data字段有两个作用:#1:持久化wal日志时作为日志的一部分被写入磁盘,从而做好了数据备份,#当崩溃时数据不会丢失,重启时重放一遍日志就行#2:apply的时候被解析,#data字段是一个完整的请求,包含了数据和请求类型等元信息,#apply阶段从data字段解析出请求r后就根据Put字段不为空从而判断出这是一个put请求,#从而可以正确分发请求(调用不同函数)#所以不管是什么请求,这个raft同步日志的流程都是通用的,#因为raft流程完全不涉及data字段etcdserver.EtcdServer.raftRequestOnce #以raft方式完成此请求etcdserver.EtcdServer.processInternalRaftRequestOnce #干三件事:1:检测是否接受该请求。如果达到阈值就停止接受新请求#2:准备好raft所需的日志数据(把客户端请求(put/delete/txn等)#序列化以后封装到raft日志的data字段)#3:进行propose然后等到其他goroute成功apply数据etcdserver.EtcdServer.getAppliedIndex #获取已经应用的日志的索引(即appliedIndex),应用成功表示数据已经写入数据库etcdserver.EtcdServer.getCommittedIndex #获取已经提交日志的索引(即commitedIndex),提交成功表示该数据日志已经同步到其他节点了#把一条数据写入etcdf服务器需要两步:#1:同步日志到其他节点,一条日志包含两部分(日志元信息,原始请求);#2:写数据到本机bbolt数据库,etcd底层用的是bbolt数据库存储数据#(原始请求中包含数据)#同步日志:即commit操作。#commit操作即是把日志同步到其他etcd节点,一旦过半节点同步成功则该日志就变成commited了#写数据:即apply操作即成功把数据写入底层boltdb数据库#!!!apply操作是各搞各的,leader与follower之间不会为此进行通信和同步,#不用担心崩溃恢复的问题,原因如下:#apply一条日志之前该日志必定是commited,即成功写入了本地磁盘并且同步到了过半节点上,#所以崩溃后日志还是在的,数据还是在的#崩溃重启时,只要从0开始重新apply一遍所有日志(相当于mysql redo), #就能使数据库恢复到一个确定的状态#???待补充:因为etcd是readIndex,所以不用担心从follower节点读时落后的问题if appliedIndex+limits<commitedIndex #如果apply一条数据花费的时间长,那么就可能导致appliedIndex远远滞后于commitedIndex#一旦超过一个阈值,就拒绝本次put即此时不再接收新请求#每条日志都有一个索引表示这是第几条日志,#etcd中如果index=x的索引还没处理完就不会处理index=x+1的日志#整个日志索引空间分两大块:持久化了的和unstable,#unstable表示这部分日志还在内存,断电会丢失#日志索引分3大段,applied(已经可以访问即写到了数据库),#commited(已经提交了日志但还没有写到数据库),uncommited(还未提交日志)#applied:[0,x] commited:[0,z-1] uncommited: [z,+∞),#x必定小于等于z-1即必须先提交后应用#还可以分为stable(持久化到了磁盘),unstable:还在内存,断电丢失#stable:[0,k-1],unstable:[k,+∞),#如果allowUseUnstable(可配置,即允许commite和apply未持久化的日志)#那么x/z都可以大于等于k,反之都必须小于k#有个InProgress表示正在进行持久化的数据即inprogress之前的已经持久化了,#之后的分两部分:正在持久化和还未开始持久化#还有个applying表示正在应用中的日志即applying:[x+1,applying],#applying要小于等于commited#!!applying和commiting时日志可以还没有持久化(可配置allowUseUnstable),#因为持久化和commited/apply操作都是异步的#不过一般来说commitedIndex要小于unstable的第一条日志的索引(默认配置是这样)#关于日志还有另一种分法,即snapshot,memtable,unstable#snapshot[0,a]:这部分日志已经持久化了,并创建了对应的snapshot文件,#该文件记录了当前的appliedIndex值即a,重启后会从a开始重放#memtable[a+1,b]:这部分日志已经持久化了,但不属于快照,会在内存创建一个对应的数组储存#ustable[b+1,+∞):这部分还没持久化return #applyIndex+limits<commitedIndex,表示落后程度达到了阈值,所以直接拒绝新请求idutil.Generator.Next #为该请求生成一个唯一id,会把这个id和一个chan对应起来,#当异步完成propose流程后会通过这个chan通知此gorouteetcdserverpb.InternalRaftRequest.Marshal #序列化raft请求的数据部分,后续会被放到一个msg对象中,然后准备发往其他etcd节点wait.list.Register(id) #为前面生成的id生成一个chan,假设叫做ch,会这个ch加到的内部的map中即id->chan#这样我们就把(req,id,chan)三者联系起来了#当前goroute会阻塞在这个ch chan上以等待put结果即等待数据apply成功#当日志同步完成后其他goroute会写这个chan来通知当前线程put完成#收到结果后就返回给用户了raft.node.Propose #调用node接口来通知底层的node进行propose#etcdserver看到的节点是node,对于etcdserver来说,node就是一个raft层面的节点#所以etcdsever把propose这个任务交给node去完成,etcdserver只要等待结果就好#node.propose方法就是写node.propc这个chan#来通知node的另一个goroute来进行propose#propc是propose chan的缩写,c表示chan#node的这个goroute就是node.run这个函数,这就是一个状态机#这个状态机有个switch ,处理不同的chan上的消息#raft同步日志流程,就是一个2pc节点,即包括两个阶段:propose和commited#propose就是leader通知各个follower来了一条新日志,你们给我保存一下#commitd就是leader检测到一些日志可以提交了(即检测到该日志已有过半follower同步成功)#那么leader就先本地提交然后发消息通知各个follower提交这些日志#笔记:因为follower可能失败,所以才需要先收集是否有过半节点可以成功raft.node.stepWait(pb.Message{Type: pb.MsgProp, Entries: []pb.Entry{{Data: data}}}) #创建raft消息并把消息发送给node的状态机goroute#消息类型为MsgProp即代表这一步是propose#不管什么请求(put/delete/txn/compact等),原始请求都会作为raft请求的data字段raft.node.stepWithWaitOption #这个函数只处理Propose消息,对于其他类型的消息则直接发到recvc chan,#由对应线程来处理,然后直接返回if m.Type != pb.MsgProp:case n.recvc <- m #非Propose消息直接转发给node的recvcreturnraft.node.propc <- msg(pb.MsgProp) #把包含了日志的Propse消息发往指定chan,即raft.node.run这个线程去异步proposecase err := <-pm.result #阻塞,直到propose完成并把结果填充到resultchan中------->2: 请求发到node.propc chan #------>另一个goroute,node.run,即raft状态机流程来到另一个goroute即node.runraft.node.run #node.run函数相当于raft状态机,里面会有一个死循环不断select,select中有很多chan#每个chan对应一个状态,每个chan都有一个配套的动作#上一步我们是发送给了node.propc这个chan,那么配套的动作就是propose即发起提议#propose处理流程分2步:#1:leader先在本机上把该日志持久化(实际是异步操作,只不过会默认持久化成功并更新相关变量)#2:发送MsgApp消息通知follower节点保存该日志#之所以本机上也做是因为leader本身也算作一个节点,最终投票时leader本身也要投一票,#所以leader本身也要做follower所做的事#向follower节点发送的MsgApp日志同步消息会放到发送缓冲区由其他线程发送,#并乐观的更新leader看到的follower节点视图(即pr.next)#即消息还没发给follower,就默认follower会成功。就算失败也没关系,#因为检测的是pr.Match这个字段而不是pr.next, #如果指定时间没有过半节点的pr.match字段达到日志对应的索引,#那么leader就知道这个失败了,就会进行相应处理for{...略,后面会细讲... #略去其他代码,此处我们直接来看propc对应的propose动作select:case <-raft.node.propc #当收到etcdserver发来的日志同步请求时执行这里#做两件事:1:调用StepLeader发起propose;2:等待,直到收到propose的结果raft.raft.Step #node调用raft.step来执行下一步#node与raft关系:node是大脑,raft是执行器,#也就是说raft提供了各种能力比如写wal持久化、网络通信等能力#然后node就是根据当前请求的状态来判断该执行什么操作#node只负责决策,不负责执行,实际执行时node就通过调用raft的各个函数来实现#节点有三种角色(leader/candidate/follower),#对应stepLeader/stepCandidate/stepFollower函数,根据角色调用对应的函数#这里以leader为例,因为etcd只允许leader发起propose,#所以当前节点必定是leader,如果是follow,那么会把propose请求转发给leaderraft.stepLeader: #leader propose主要执行两个操作:#1:把日志写入unstabel;#2:发送MsgApp消息给follower,日志会放在MsgApp消息中case MsgProp: #这里处理propose消息,leader的日志追加操作对应的是etcdserver层发来的MsgProp消息,#而非leader的日志追加操作则对应的是leader发来的MsgApp消息,#再次强调:只有leader才可以proposeraft.raft.appendEntry #本地添加日志,只是添加到内存,并不会立即持久化,是由另一个goroute异步进行持久化#!!!不用担心还没持久化就崩溃导致数据丢失#在关闭allowUseUnstable的情况下etcd只会commite已经持久化的日志,#所有还在unstable中的日志必定是还没有持久化的,#还没有持久化的日志必定不会commite和apply,#所以必定不会返回成功的结果给客户端,所以客户端肯定也知道本次请求还没有成功#所以客户端可以选择稍后重试直到etcd集群恢复正常,#所以即使断电也没关系,unstable中的日志丢失就丢失了,完全不用担心#!!!补充一下:发起propose、commit日志、apply日志是三个goroute#!!!他们都是互相异步的,也就是说commit日志(即写入wal日志)/apply日志时#!!!不是说a goroute通知commit/apply goroute去同步某条日志#!!!而是说a goroute搞完以后更新一下proposedIndex和commitedIndex然后就返回了#!!!这样wal goroute会异步不断的写日志到磁盘,直到达到最大的值#!!!apply也是一样的,也就是说wal goroute会落后于proposed#!!!apply也会落后于wal即commite#!!!这里的落后是指:索引x处的日志已经proposed完成完了,#!!!但是wal刷盘线程可能还没写到那个位置,也就是落后于proposed#!!!索引y处的日志已经wal写到磁盘了,但是还没有apply,即apply落后于commitelog.raftLog.lastIndex #获取最后一条日志的索引,#然后本次新增的n条日志的索引就依次为lastIndex+1、lastIndex+2,...lastIndex+n#可以把整个日志空间看成一个逻辑数组,#第0个元素的日志索引就是0,第x条日志的索引就是x,每次追加日志都是追加到数组末尾#然后unstable就是这个逻辑数组中位置x到+∞的这部分索引对应的日志#即unstable就是整个数组的后半段log_unstable.unstable.maybeLastIndex #尝试返回unstanble的最后一条日志的indexif l := len(u.entries); l != 0: #如果u.entries不为空则返回u.offset+len(u)-return u.offset + uint64(l) - 1 #u.offset表示u的其实offset,因为整个索引空间是0~+∞#u只保存了u.offset~+∞,即最后这一段if u.snapshot != nil : #如果len(u)==0,则返回snapshot的indexreturn u.snapshot.Metadata.Index storage.MemoryStorage.LastIndex #如果snapshot也是空,则返回memstore的lastindex#memstorage中保存的都是持久化了的日志#所以说mem.lastIndex<=snapshot.lastIndex<u.lastIndexraft.raft.increaseUncommittedSize #计算未提交数据总大小,#如果算上本次准备提交的日志数据字节数#超过了系统允许的未提交数据的字节数#那么就拒绝本次提交,以免未提交数据太多,一旦断电,影响太大#比如由1G的数据还没提交,然后断电,丢失,#client又要花时间花资源重做这1G数据对应的请求,代价太大log.raftlog.append #这里把日志添加到unstable数组的末尾,由其他线程异步持久化到磁盘if after := ents[0].Index - 1; after < l.committed : #如果新增日志的索引号小于已提交最后一条日志的索引号,#所以发生了不可修复错误,直接打印panic日志log(panicLevel,xxx)raft.unstable.truncateAndAppend #这里把日志添加到unstable数组的末尾,并且会truncater.prs.Progress[r.id].MaybeUpdate(li) #r代表的就是当前的节点即当前的leader#这里就是更新leader的match,即表示leader已经持久化到了这里#quorum过半匹配时就是判断的这个match字段#!!!因为这是leader,所以才立即更新,这是leader更新自己#而非leader则需要持久化以后才会更新 if pr.Match < n:pr.Match = n #更新pr.Match字段updated = truepr.ProbeAcked()pr.Next = max(pr.Next, n+1) #更新pr.Next raft.raft.maybeCommit #注意:假设本次请求对应的日志索引是x,#此处commite并不是指提交到x#而是会实时监测一下此时应该提交到哪里,#也就是说maybeCommit仅仅是一个激活操作#具体提交到哪个位置,则由当前的状态决定#这里简单记录一下,后面会详细介绍log.raftlog.maybeCommitl.commitTo #提交leader的commitedIndex代表日志已经提交到这里了raft.raft.bcastAppend #bcastAppend的意思是broadcastAppend,#即广播需要追加的数据即日志到所有peer节点,广播的是MsgApp消息#bcastAppend发过去的消息要干两件事:#第一件事是告知peer节点要追加哪些日志,#第二件事是通知peer节点更新状态#MsgApp消息包含两个部分,第一个部分是需要append的日志,#第二个是peer节点的新状态(比如commitedIndex)#leader拥有所有节点的视图(即progress对象),#然后leader会更新这些progress对象#更新完后通知peer节点把他们自己的状态更新到leader所看到的这个状态,#比如leader的commited到达x了,peer节点也需要把commited索引更新到x。tracker.ProgressTracker.Visit #遍历所有节点,etcd用id来代表集群,visit接受一个lambda函数,#该函数中如果目的节点是自己,那么就会忽略该节点raft.raft.sendAppend #发送MsgApp发到指定节点raft.raft.maybeSendAppend #!!!在发送给follower节点的MsgApp消息内#!!!还会带上leader视角下follower此刻的状态信息(任期、Match、Next)#!!!即leader认为follwer此刻应该提交到这里了,#!!!follower收到后就会按照leader的命令用这些信息更新本机的状态#!!!假设当前收到的请求对应的日志为x,#!!!这个maybesend并不是就发送这条日志x,#!!!相反的是maybesend函数并不关心当前收到的日志#!!!而是会检测状态,凡是满足要求的日志都会被发送,#!!!所以发来一条请求,收到一条日志,仅仅是激活send操作#!!!send操作发送哪些日志并不是由收到的日志决定,而是由当前状态决定pr := tracker.ProgressTracker.Progress[to] #to表目的节点id,etcd用id代表结点,#这里先根据目的节点id获取目的节点的相关信息,#pr表示leader视角下follower节点应该的的状态#pr即progress,即进度的意思,即leader视角下follower节点的日志复制进度#pr.Match:表示follower节点leader节点之间日志的匹配索引#即leader认为follower已经成功复制了leader上[0,pr.Match]内区间内的日志#pr.Next: 表示leader期望赋值给follower的下一条日志的索引#pr即struct tracker.Progress,#前面说的发送哪些日志,就是由这个progres状态变量来决定log.raftLog.entries(pr.Next) #获取leader上日志索引号大于等于pr.Next的所有日志,#[pr.match+1,pr.next-1]这个索引区间#表示这些日志则是已经发送给follower了但是follower还未返回确认#pr表示该某个follower节点上日志复制进度#如果leader上存在日志索引号大于pr.Next的日志,#说明leader有些日志该follower没有,即follower落后了#所以这里就是一次性把follower缺少的日志都发送给follower,#而不是仅仅发送请求中的那条日志x#所以这个mabesend叫做maybe,#如果不存在就说明没有日志要发送,如果存在说明有日志要发送#所以说日志x只是激活这个maybesend操作,#而具体发送哪些日志由实际状态来决定#当然,如果日志太多了,一次发送不了,那么就只会发送一部分,#剩下的等到下一次maybesend被激活再来发送#!!!小结:etcd中这种操作多的是,即a事件只是触发b操作,#!!!b的具体操作则由当时的实际状态决定,而不是由a传递给b的参数决定#!!!如wal日志刷盘操作、apply操作、maybeCommit、maybeSend等m.Type = pb.MsgApp #发送的是MsgApp消息m.Index = pr.Next - 1 #即to上这条msgAPP消息对应的indexm.LogTerm = term #当前任期m.Entries = ents #数据m.Commit = r.raftLog.committed #leader的committedIndex,即leader告知follower自己的状态switch pr.State:case tracker.StateReplicate:tracker.Progress.OptimisticUpdate #乐观更新to节点对应的pr.Next=最后一条日志的Index+1#OptimisticUpdate是乐观更新的意思#正常情况下to的pr.Next都等于leader lastIndex+1,#如果follower落后于leader,#即to节点的pr.next<leader的lastIndex+1#就是说这里leader先乐观更新follower状态#即使follower不是这个状态,#follower落后于leader也没关系#因为在follower报告给leader的heartbeat消息里,#follower会发送自己的实际状态#leader会据此修正pr.Nextpr.Next+=len(entries)tracker.Inflights.Add #表示传输中的消息窗口,用于限制消息的数量和带宽,#如果满了会检测到,从而导致本次发送取消。raft.raft.send(pb.Message{ #把leader视角下follower的状态都填充到这个MsgApp消息里To: to, #消息接收节点对应的idType: pb.MsgApp, #消息类型Index: lastIndex, #此刻follower节点上最后一条日志的索引号应该为lastIndex#如果对不上,follower就会忽略这条MsgApp消息LogTerm: lastTerm, #此刻follower节点上最后一条日志对应的leader的任期#如果对不上,follower就会忽略这条MsgApp消息Entries: ents, #本次leader发送给follower的日志Commit: r.raftLog.committed, #告诉follower,leader上的日志已经提交到这里了}) #把leader视角下follower的状态都填充到这个MsgApp消息里#给peer节点发送一条msgApp消息,#节点收到一条msgapp消息#表示收到leader节点发过来的一条日志同步消息append(raft.msgs) #msgApp消息会放到leader的msgs数组中#会有另一个专门的发送线程不断轮询这个数组,如果发现msgs不为空就发送#即node.run这个goroute#会在每次循环开始都会检测msgAfterAppend和msgs,有数据则表示ready#走完这里表明propose流程已经结束,#propose只需要把对应日志加到本机的unstable数组#以及把消息成功发出去就算成功,#至于有没有过半节点成功则是下一步apply的事情#注意,此时只是propose完成,put还没有完成#apply一批数据后会通知正在wait的chan put请求已完成#然后put才会结束等待,才会返回ok给应用#此时经过一次propose,那么就会产生数据需要处理#比如持久化wal日志、更新commited等#所以下一步就是通知node内部的raftNode来执行持久化wal日志等操作------->3:还是node.run #仍然是node.run函数,和上面的是同一个函数,但是这是下一次循环了#这些代码原本是在select case <-raft.node.propc之前的#这里就是检测是否有数据redy了,如果有就把这些数据放到一个结构里,#然后填充指定chan来通知数据到了,不同的chan配套有不同的动作#此处就是通知底层raftNode有数据来了,你该开始干活了#raftNode负责持久化wal日志等操作raft.node.runfor{ready=raft.RawNode.HasReady #此函数是node.run用来检测是否有数据需要处理,如果有就会激活本次循环,#如果无则本次循环会阻塞在某个chan上,数据有多种,来源也有多种if !r.softState().equal(rn.prevSoftSt): #1:判断软状态是否和之前的状态相同,如果不相同则说明需要处理#软状态:{leader,role},即leader是谁以及当前节点的角色是什么#软状态不会持久化,所以才叫软状态return trueif hardSt := r.hardState(); !IsEmptyHardState(hardSt) && !isHardStateEqual(hardSt, rn.prevHardSt):#2:判断硬状态是否和之前的状态相同,如果不相同则说明需要处理#硬状态:{vote,term,commit},当前候选人、当前任期、当前已提交的索引#硬状态会在写wal日志的时候一同写入磁盘return trueif r.raftLog.hasPendingSnapshot():#如果有挂起的快照操作,则说明需要处理,#快照的用途是当follower严重落后时leader直接发一个snapshot过来#然后follower收到snapshot后会把他放到usntable的snapshot变量里,#这个变量不为空表明这个snapshot还没有持久化到磁盘,写到磁盘后会置空这个变量return trueif len(r.msgs) > 0 || len(r.raftLog.unstableEntries()) > 0 || r.raftLog.hasNextEnts():#3:msgs不为空表示有数据要发送,#unstableEntries不为空则表示有日志需要持久化#hasNextEnts为真表示applyIndex<commitIndex,表示有数据可以applyreturn trueif len(r.readStates) != 0:#4:ReadStats数组是否为空?#不为空则说明有ReadIndex请求已经有过半节点确认当前leader是有效地,#可以唤醒并执行该读请求 return truereturn false #如果上面的条件都没满足则表示本次循环无需处理if ready: #这个hasReady操作在node.run中是在每次循环开始的时候判断,#因为上一次循环中处理MsgProp消息时会把发送给其他节点的写入msgs #所以本轮循环,hasReady会返回true,这些数据会在本次循环中处理,raft.RawNode.readyWithoutAccept #准备好本次要处理的数据,会把所有数据封装到一个raft.Ready结构中#因为etcd所有操作基本都是异步的,所以同一时刻三种数据都会存在:#unstable:需要持久化的数据、#commited:已经提交的数据,需要apply#msgs:发往其他节点的消息(如heartbeat/MsgApp/MsgAppResp等)#正因为如此,所以每次迭代时会同时进行三种操作#即持久化unstable、apply已经commited的数据、发送msgs到peers节点raft.raftlog.nextUnstableEnts #2:获取所有未持久化的日志,这个写操作就是wal(预写日志)raft.unstable.nextEntriesreturn unstable[inprogress+1:] #inprogress之前的数据表示之前已经在持久化过程中或者已经持久化了#只不过还没完成raft.raftlog.nextCommittedEnts #3:获取一批已经commited,需要apply的日志raft.raftlog.maxAppliableIndex #如果allowUseUnstable,那么可以允许提交未持久化的日志,反之不允许hi := l.committedif !allowUnstable { #如果配置不允许使用未提交的数据,则只允许提交已持久化的日志,#offset-1表示最后一条持久化的日志的index#如果不允许使用unstable的日志,#那么这样就可以确保用户一旦收到写入完成的响应,#那么数据对应的wal日志必定已经写入了磁盘#此后即使服务器崩溃,数据不会丢失#如果开启,就说明还没有持久化的日志可以被apply,这样断电以后,有可能数据丢失hi = min(hi, l.unstable.offset-1)}raft.raftlog.slice(applying+1,hi+1,maxSize) # [applied,applying]区间内的数据已经在应用中了# 本次commite[applying+1,hi+1)之间的日志,#至多处理到hi,并且至多处理maxSize字节select #这个select和上面的select case <-raft.node.propc是同一个selectcase readyc <- rd: #数据已经丢给raftNode,即另一个goroute ratNode.start在等待这个radycraft.RawNode.acceptReady #此case主要就是一个发送的目的,一旦发送完成就重置当前的rd结构raft.RawNode.raft.prevSoftSt = rd.SoftStateraft.RawNode.raft.readStates = nilraft.RawNode.raft.msgs = niladvancec = n.advancec #等raftNode处理完后就会发送消息到advance这个chan#捋一下:etcdserver内嵌一个raftNdoe,raftNode内嵌一个raftNodeConfig,#raftNodeConfig又内嵌一个node,node又内嵌一个RawNode#一个rawNode又内嵌一个raft节点以及当前节点的状态#node负责根据请求的当前状态作出决策,然后node调用这个raft对象来实际执行各种操作------->4:raftNode.start #在步骤3的select中我们通过case readyc<-rd 把准备好的数据#发送给了readyc,所以下一步就是处理readyc中的数据了raft.raftNode.start #这里会启动一个goroute,然后启动一个死循环,通过select从chan接受消息go func(): for{selectcase rd := <-r.Ready() #上一步node.run的case readyc <- rd这个操作会激活这个case#raftNode的操作最终都是委托给内部的raft对象的#因为传过来的rd中同时包含了#unstable(准备持久化的数据)/commited(准备apply的数据/msg(准备发送的消息)#所以这个case也会同时干这三件事sync(unstable)、apply(commited)、send(msg)#!!!注意,raftNode的apply只是通知etcdserver去进行apply,#!!!raftNode本身不执行apply#这个case一共做五件事:#1:检查软状态是否有改变(比如集群是不是有新leader了,集群是否有leader),#如果有则调用相关函数#2:如果readStateC数组不为空,#则说明此刻appliedIndex大于等于这个readState所等待的commitedIndex #需要通知etcdserver进行apply#3~5:sync(unstable)、apply(commited)、send(msg)if rd.SoftState != nil: #1:检查集群leaderetcdserver.EtcdServer.setLeader #设置集群leaderupdateLeadership=func(){ #表示这里调用的是一个lambda函数if leader=节点自己:v3compactor.Periodic.Resume #如果leader是节点自己,那么重新开启compact#因为compact会改变集群数据状态,所以etcd只有leader才可以发起compact,#进行compact之前要先走raft同步一条compact日志#compact日志中包含本次要要压缩哪些版本的数据if leader!=节点自己:v3compactor.Periodic.Pause #如果leader节点不是自己,那么就暂停compactif newLeader:s.leaderChangedMu.Lock()lc := s.leaderChangeds.leaderChanged = make(chan struct{})close(lc)s.leaderChangedMu.Unlock()#???待补充:close(lc)时其他goroute会感知到leader变了#我们在并发进行线性读的时候因为要确认是否有过半节点有效,#所以函数requestCurrentIndex会阻塞#直到节点确认过半节点承认节点是当前集群的leader之后#才会去读取当前的appliedInex#如果requestCurrentIndex阻塞的时候收到集群leader改变的消息#那么他就会立刻放弃所有此前还在等待的ReadIndex请求,然后返回错误}()if len(rd.ReadStates) != 0 {r.readStateC <- rd.ReadStates[len(rd.ReadStates)-1] #2:如果readStattes不为空,说明此刻有ReadIndex准备好了,#所以需要唤醒getCurrentIndex函数etcdserver.updateCommittedIndex #设置server上的commitedIndex。#etcdserver的commitedIndex变量#取决于底层的raft日志的commitedIndex#可以这样理解:server有一层状态,#然后server底层的raft日志也有一层状态#raft日志的commited表示日志已经同步到这里了,#对于server来说,这些日志都是可以commited的#即etcdserver的commitedIndex小于等于raft的commitedIndexidx:=commitedEntryLastIndex #本次待apply的最后一条日志的indexetcdserver.EtcdServer.setCommittedIndex(idx) #设置server上的commitedIndexwaitWALSync := shouldWaitWALSync(rd)if waitWALSync: #只要有新unstable的日志,那么就为true#即在apply之前必定会持久化日志wal.Wal.Save #持久化wal日志raftNode.applyc <- toApply{commitedEntry,notifyc chan} #3:通知etcdserver apply(commited)#把准备apply的数据填充到apply chan中,#etcdserver.run会监听这个applyc chan#commitedEntry表示本次要apply的日志,#notify用来同步etcdserver.run里的apply操作#因为etcdserver.run里apply函数#在成功写入bbolt数据库后会调用trrigerSnapshot做一个快照#而本函数也会处理快照,所以为了安全,需要一个notify chan来同步if isLeader: #4:把待发送的消息发送给follower节点(put流程中此处是MsgApp消息)etcdserver.raftNode.processMessages #根据实际情况重新设置消息的某些字段,主要就是丢弃自己发给自己的消息#r.msgs中的消息就是thread 1 中填充的准备发往peer节点的pb.MsgApp消息#和发给自己的raftpb.MsgAppResp消息#这里将MsgAppResp的目的地是自己的消息的目的地设置为0表示丢弃rafthttp.Transport.Send #把r.msgs中的消息发送给peer节点rafthttp.peer.Send #把msg写入一个chan,然后由监听这个chan的其他线程去异步发送请求给peer节点writerch=rafthttp.peer.pick #获取对应的输出chan,支持 streamAppV2、streamMsg、pipelinewriterch<-msgs #把msgs填充到chan中,这个chan会有对应的线程x在监听,#一旦收到chan就会把消息发往指定节点#也就是说发送线程从raftnode.ready chan(实际是node.readyc)接受数据#然后把收到的响应填充到raftNode.recv chan(实际是node.recvc)if !raft.IsEmptySnap(rd.Snapshot)storage.storage.SaveSnap(rd.Snapshot) #5.1:持久化快照点。如果rd的snap字段不为空说明leader发来了snap,#需要对应的snap数据持久化到磁盘上,以供后续回放#说明这个节点当前的角色为follower#注意:这里说的snap就是本文后面apply完成之后#triggersnap函数创建的snap日志文件和snap日志#这个snap点相当于mysql里的checkpoint点,#snapshot操作也是必须得leader发起snap.Snapshotter.SaveSnap #保存snap的数据部分到一个单独的文件中,#数据部分指的是集群状态、term、index,#并不是bbolt数据库中的数据,别搞混了wal.Wal.SaveSnapshot #保存一条snap日志到wal文件中,#snap日志和其他raft日志都是一样的只是日志的类型不同而已if !waitWALSync:storage.storage.Save #5.2:刷盘wal日志,即持久化unstable部分对应的所有日志#是所有usntable日志以及当前的硬状态(term/vote/commit)#unstable之前的日志肯定也已经写入磁盘,#即那些准备返回结果给用户的数据对应的日志肯定已经写入了磁盘#因为etcd会限制未提交的数据大小,所以一旦请求太多,#导致unstable的日志超过阈值,那么集群就会拒绝后续所有新增put请求#当完成这一步的时候可以确保目前leader上所有unstable的日志都已经持久化到了磁盘,#此后崩溃也不怕了#leader发送MsgApp消息给follower节点和leader刷盘wal日志是一个并发操作#刷盘的时候是按日志的index来刷盘的,即index=a的日志如果刷盘失败,#那么index>a的日志肯定不会在存储中#下面的为个人随笔,并非etcd的实现,:#可以简单理解为一条日志刷入磁盘的日志状态有四种取值:#prepare、commited、applied、abort#prepare状态:这是初始状态,已经给follower节点发出MsgApp消息了,#但是还没有收集到足够多的选票#abort状态: 指定时间内没有收到足够多的选票、被其他节点拒绝了,#那么就把prepare状态的日志标记为abort#commited状态:收集到了足够多的选票,则从prepare转变为commited#applied状态:commited状态的日志发布成功则变为applied状态#突然断电后重启后通过重放日志来恢复:#prepare:可以简单的选择直接丢弃或者重试,全在个人取舍#abort状态:直接丢弃#commited:再次apply就行#applied:直接使用#OLAP型数据库starrocks1.9版本的事务处理流程就是与上面的流程几乎一模一样#但是etcd实际不是这么处理的,etcd判断一条日志是不是有效,#是通过集群协商来获取的,而不是直接记录日志的状态#崩溃重启后,etcd节点会获取到集群leader,然后以leader为准,#对于index=a的日志,如果leader上有,且index、term都对的上,#那么这条日志对于该etcd节点来说就是有效地,#相当于上面所说的处于commited状态的日志,#如果这条日志leader上没有或者冲突了,#那么这条日志对于该节点来说就是prepare的,直接丢弃, #etcd这里没有abort,因为所有日志都是以leader为准,比较霸道#如果leader发现一条日志一个follower节点返回reject,#那么该leader就会强制用自己的日志去覆盖该follower的日志#如果leader发现一条日志一直收不到过半节点返回响应,如果是节点宕机或者网络出错,#那么leader在heatbeat部分就会检测到该节点失效#并且会标记该节点并返回失败wal.Wal.Save #是storage.storage.Save函数里调用wal.Wal.Savewal.Wal.saveEntry #保存日志,此处还没有强制刷盘wal.Wal.saveState #保存硬状态,包括:#Term:表示当前的任期#Vote:表示当前节点最后投票给的候选人的ID#Commit:已经被大多数节点确认过的最高索引号。sync=raft.MustSync #判断是否需要强制刷盘,如果写入的entry不为空或者硬状态不等于旧状态则必须强制刷盘if curOff < SegmentSizeBytes: #如果一个段没有写满。默认64MB,不一定是64MB,#因为最后一条日志大小不一定刚好凑满64MB,可能超出一点点if sync:wal.Wal.sync #强制刷盘wal.Wal.Cut #超过了指定的文件大小,#所以先把文件切割成两部分(不会把一条日志保存到两个文件),然后分别刷盘if !raft.IsEmptySnap(rd.Snapshot)wal.Wal.Sync #强制wal刷新 raft.MemoryStorage.ApplySnapshot#应用快照,就是按照发来的snap来设置memtable的相关字段storage.storage.Release #释放掉旧的snap文件的锁,以便purgeFile线程可以异步删除过时的文件。#之前加锁是为了防止文件被其他人其他线程意外删除raft.MemoryStorage.Append #把日志条目追加到memoryTable中,此时已经写好wal了,所以断电也不怕丢失#wal日志会被最新的快照点分为两部分,快照点之前的日志都是保存在磁盘上,#快照点之后的所有日志,etcd还会把他加载到内存中notifyc <- struct{}{} #这个notifyc是上面toApply chan内部的一个chan,#etcdserver.run在处理applyc chan过程中会等待这个notifyc#是snap相关raft.node.Advance #发消息通知道node.run可以进行下一步了,处理完本次ready后通知node往下走,#主要是处理stepsOnApp中leader自己发给自己的的MsgAppResp消息raft.node.advancec <- struct{}{} #node.run会监听这个advancec chan------->5:etcdserver apply #raftNode在步骤4中数据刷盘以后,就表示日志必定已经持久化了#日志同步完了,所以就可以执行put操作了#所以把新检测到的可以apply的数据丢给etcdserver#etcdserver apply就是把任务丢到另一个goroute的fifo队列让他去慢慢apply#一旦apply日志x,就唤醒日志x对应的请求所在goroute返回ok给clientetcdserver.EtcdServer.runforselect:case ap := <-s.r.apply():f := func(context.Context) { s.applyAll(&ep, &ap) }#把applyAll封装到一个job里面,然后丢到异步fifo队列去执行,#注意,fifo是先进先出的意思,fifo队列会等到事务x-1完成才会开始事务x#这样就保证了一定是按日志顺序apply的#一个事务对应一个appliedIndex,所以代码里也是用revision来作为事务id。#另一个方面,之所以开一个异步队列是因为applyc chan的容量只有1,#如果顺序执行如果一直本次apply非常慢,那么就会导致无法接受新的apply请求,#从而导致发送方也阻塞,然后整个系统都阻塞了schedule.Schedule.schedule #把这个job丢到一个fifo队列里,然后一个work线程不断从fifo队列取元素,#这里略,这里直接就转到applyAll函数etcdserver.EtcdServer.applyAll #干两件事:1:把数据写入bbolt; #2:写reqid对应的chan从而通知etcdserver本次客户端请求已经完成了,可以返回#!!!一路都是串行的,即直到把数据写入bbolt数据库才会返回,#!!!即上一个事物还没结束,下一个事务必定不会开始etcdserver.EtcdServer.applySnapshot #应用snapshot。就是把相关变量重置为snapshot文件中指定的相关信息if apply.snapshot.Metadata.Index <= ep.appliedi: #如果leader发来的snapshot中的applyindex#比当前本机的applyIndex还要小,那么panicpanice <-apply.notifyc #等待步骤4中raftNode持久化snapshot到磁盘后才会apply snapshotstorage.OpenSnapshotBackend #打开指定目录下的xx.snap.db文件,这个是真的备份文件,#etcdctl --endpoints xx snapshot save命令#会从--endpoints指定的server上拉取数据并保存到本地#当使用etcdctl snapshot restore的时候#就会去读取指定的文件然后从该文件恢复数据库#更多详情见snapshot那一章mvcc.watchableStore.Restore #读取指定文件(xx.snap.db),然后恢复数据,略etcdserver.EtcdServer.applyEntries #根据日志,把结果写入数据库if firsti > ep.appliedi+1 #必须按日志顺序apply,#如果x-1这条日志还没有applied,那么x就不能apply#ep.applied表示leader上applied的最后一条日志的索引#firsti表示本次准备apply的第一条日志的索引paniccurRev=etcdserver.Etcdserver.apply #应用数据,也就是把数据成功写入etcd,#当写入一条数据后会更新当前集群的revision字段,#apply这些操作是每个节点自己进行的,#无需同步,因为日志已经同步好了,#按顺序apply就可以保证不会出差错。#revision字段是该节点上所有操作都共用的一个字段#即下一个事务的revison就等于curRev+1cindex.consistentIndex.SetConsistentApplyingIndex #设置底层一致性存储的applyingIndex,这一块还不懂,#之所以还搞一个这个,看代码注释说是为了安全case raftpb.EntryNormal: #put数据对应的entry是EntryNormal,#put完之后还要更新applied索引etcdserver.EtcdServer.applyEntryNormal: #干两件事:apply一条日志;#通过ch<-x通知其他goroute本次put操作已结束apply.authApplierV3.Apply #apply之前判断一下是否允许applyapplierV3backend.Apply#根据req类型来调用对应的方法#!!!raft层的作用只是同步一下日志,#会把具体的请求内容封装在日志的data字段#因为raft层只同步日志,完全不关心data字段case r.Put!=nil: #put请求则调用对应的put方法,执行链 auth->quota->backend#raft请求里面有多个指针比如put/delete/txn等,#这些指针有且只有一个不为空apply.authApplierV3.Put(txn=nil) #判断是否允许putstore.authStore.IsPutPermitted #判断该用户是否有权限put到磁盘apply_auth.authApplierV3.checkLeasePuts #判断lease是否正确store.authStore.IsRangePermitted #判断是否有权限RangePutapply.quotaApplierV3.Putstorage.BackendQuota.Available #检查磁盘是否还有足够空间写入数据,#backend可以简单理解为磁盘apply.applierV3backend.Putif txn=nil: #如果是非事务put,则创建事务put#也就是说不管事务还是非事务,底层都统一使用的事务putleases.lessor.Lookup #如果key提前设置了租约,#则租约必须存在,如果没有,则忽略mvcc.watchableStore.Write #创建事务put对象return &watchableStoreTxnWrite{s.store.Write(trace), s}kvstore_txt.store.Writes.mu.RLock() #???待补充tx := s.b.BatchTx()tx.LockInsideApply() #锁住txnbuffer,因为txnbuffere只能顺序写,不允许并发写#一堆又一堆的接口,太恶心了#总之记住txnbuffer里面有一个batchtxn,txnbuffer是对外的#txnbuffer的一切操作之中都会转到batchtxn#笔记中有些地方可能把batchTxnbuffer错记成了batchtx#也有可能把batchtx错记成了batchtxnbufferetw := &storeTxnWrite{storeTxnRead: storeTxnRead{s, tx, 0, 0, trace},tx: tx,beginRev: s.currentRev,changes: make([]mvccpb.KeyValue, 0, 4),}return newMetricsTxnWrite(tw)watchable_store.metricsTxnWrite.Put #mvcc 事务put,这是通用流程 #!!!对应的包名是MVCC#!!!就是说etcd使用的是mvcckvstore_txn.storeTxnWrite.Putkvstore_txn.storeTxnWrite.putrev := tw.beginRev + 1 #本次put的主版本号就是beginRev+1c := rev #c表示create_revision _, created, ver, err=index.treeIndex.Get #查找key是否存在,如果存在则获取他的旧信息#即create_revision、version#不存在则跳过,即默认为NoLeaseif err == nil: #err=nil表示key已存在c = created.main #从中取出key的create_revisionoldLease = tw.s.le.GetLease(lease.LeaseItem{Key: string(key)})#取出旧的lease信息idxRev := revision{main: rev, sub: int64(len(tw.changes))}#主版本号+副版本号#每次put只对应一个主版本号,#但是一次put中每put一条一条数据,副版本号就会+1#因为副版本号=len(tw.changes)#他是put一条,就append一条到tw.changes这个数组#所以副版本号从0开始,每次递增#tw.changes包括其他信息即create_revision、#mode_revision、version、key、value、leasever = ver + 1 #当前key的版本号,即每个key都会有自己独立的版本号#ver默认为0,所以每个key的第一个版本号就是0+1=1kv := mvccpb.KeyValue{ #要保存的值:Key: key, #本条日志对应的keyValue: value, #valueCreateRevision: c, #createRevision=旧的createRevision,即不变ModRevision: rev, #ModRevision=本次put请求对应的主版本号Version: ver, #version=key自己独立的版本号+1Lease: int64(leaseID),}#总结一下版本号的概念#rev即revision#etcd有个版本概念的:revision(修订版本号)#revision是相对于整个集群的,#即每次修改操作都对应一个版本号,为上一次操作的版本号+1,#revision有两个字段,main和sub,main表示事务操作的主版本号#同一事务中发生变更的key共享同一main版本号,#sub表示同一事务中发生变更的次数,从0开始#这个变更指的是变更存储#etcd还有个version的概念,这是针对每个key的,互相独立的#举个例子: #假设本次事务为put,且只有一次put,且一次put3条数据#([a,1],[b,1],[c,1]),且磁盘已经存储了a这个key即[a,-1]#且a这个key的version为kv#假设上一次操作后系统的revision版本号为rv#那么put a 1 时(main=rv+1,sub=0,kv=kv+1)#那么put b 2 时(main=rv+1,sub=1,kv=1)#那么put a 3 时(main=rv+1,sub=2,kv=1)#版本号虽然单调递增但是是64位,目前是不可能用完的#注意:别把日志索引(index)和数据版本(revision)混起来了batchTxn.batchTxBuffered.UnsafeSeqPut #前面准备好了数据就需要put了#需要put数据以及修改index#这里先put数据key=(revision),value=(key-value,someRevisionInfo)batchTxn.batchTx.UnsafeSeqPut #执行这个的时候必须已经获取了锁batchTxn.batchTx.unsafePut #把数据存入bbolt,如果是写事务,他可能要put多个keybbolt.Tx.Bucket #获取对应的桶bbolt.Bucket.Put #把数据存放到桶里t.pending++ #bbolt是批量事务提交batchTxn.txWriteBuffer.putSeq #还没看懂,不过不是在这里写入,貌似和查询有关 20240430 19:20#20240505 15:31 嗨,各位,我看懂了,听我西索:#!!!!batchTxbuffered/writebuffer、readbuffer!!!!#backend.batchTx.unsafePut向数据库提交了一个写操作,#但是这个写操作并不是立即提交,而是先写到一个buff,会由其他线程提交#他还会把结果写到readbuffer,这样读的时候就可以直接从readbuff读#写readbuff肯定是要加锁的,所以为了避免put一个key就写一次readbuff#就弄了一个writebuff,这样就先放到writebuff里,最后一次性写入readbuff#他是在调用batchtx.Unlock释放锁的时候一次性把writebuff写入readbuff#写入以后,读事务就可以直接从readbuff读取了,就不用每次都区读数据库了index.treeIndex.Put #把(key,revision)写入内存里的treeIndex,#key=key,value=[]generation{revision},#保存了所有历史版本#这个treeindex是启动时重建的,etcd会把所有key都放到内存,#这限制了etcd支持的数据大小#generation是代的意思,一个key从创建到删除是一个generation#generation记录key从创建到删除中间所有变更的版本信息。#当key被删除后再次被操作,会创建新的generation来记录版本信息。if oldLease != lease.NoLease: #处理lease: detach旧leasetw.s.le.Detach if oldLease != lease.NoLease: #处理lease:attach新leasetw.s.le.Attach watchable_store_txn.watchableStore.End #当put完成后需要释放txnbuffer的锁#!!!此处没有commited,#!!!只是把操作丢到txnbuffer就默认成功,就直接返回了#会有另外一个线程定时commitetw.s.mu.Lock() watchable_store_txn.watchableStore.notify #通知watcher,暂不清楚watchable_store.metricsTxnWrite.End #就是更新一些metricskvstore_txn.storeTxnWrite.End #主要就是加锁和释放锁if len(tw.changes) != 0 {// hold revMu lock to prevent new read txns from opening until writeback.tw.s.revMu.Lock() #changes不为null表示有更改#加写锁保护currentRev#主要是compaction相关tw.s.currentRev++ }tw.tx.Unlock() #释放batchTxn事务锁batch_tx.batchTxBuffered.Unlock #还没看懂,不过不是在这里写入,貌似和查询有关 20240430 19:20#20240505 15:31 嗨,各位,我看懂了,听我西索:#!!!!batchTxbuffered/writebuffer、readbuffer!!!!#backend.batchTx.unsafePut向数据库提交了一个写操作,#但是这个写操作并不是立即提交,而是先写到一个buff,会由其他线程提交#他还会把结果写到readbuffer,这样读的时候就可以直接从readbuff读#写readbuff肯定是要加锁的,所以为了避免put一个key就写一次readbuff#就弄了一个writebuff,这样就先放到writebuff里,最后一次性写入readbuff#他是在调用batchtx.Unlock释放锁的时候一次性把writebuff写入readbuff#写入以后,读事务就可以直接从readbuff读取了,就不用每次都区读数据库了if t.pending != 0:t.backend.readTx.Lock() #所有读请求都会对这个readTx加读锁,#并且直到请求完成才会释放锁#因为要修改readBuffer,#所以会等到所有读请求完成并释放锁后才可以加写锁 txWriteBuffer.writeback #把writebuf中的内容写到readbufft.backend.readTx.Unlock() #释放锁if t.pending >= t.backend.batchLimit || #batchTxn就是批量事务#如果pending的事务数超过了阈值#那么就提交,默认是1wt.pendingDeleteOperations > 0 #或者挂起的delete操作数大于0{t.commit(false) #提交事务t.backend.readTx.Lock() #要提交事务必须先阻止所有读batch_tx.batchTxBuffered.unsafeCommit #提交事务t.backend.hooks.OnPreCommitUnsafe(t) #执行hookif t.backend.readTx.tx != nil: #tx是bbolt.tx #如果bbolt.readTx不为null#则需要等待这些读事务完成才能commit go func(tx *bolt.Tx, wg *sync.WaitGroup):wg.Wait()bbolt.tx.Rollback()t.backend.readTx.reset()batch_tx.batchTx.commitbbolt.tx.Commit() #bbolt的事务提交sdkt.pending = 0t.pendingDeleteOperations = 0t.backend.readTx.Unlock() #提交完毕,释放锁}t.batchTx.Unlock() #释放txnbuffer,由此可以观之,同一时刻,#只会有一个事务修改batchTxnBuffered#因为etcd是按日志顺序for循环逐条串行apply的,#所以对于普通的put/delete,一般是串行的#加锁我猜主要是为了和compact等竞争#(snapshot和defrag不清楚)if len(tw.changes) != 0 {tw.s.revMu.Unlock() #释放tw.s.revMu.Lock锁}tw.s.mu.RUnlock()id := raftReq.IDwait.list.Trigger #数据已经写入数据库了即apply成功,#所以这里把applyInternalRaftRequest的结果写入reqid对应的chan,#从而通知processInternalRaftRequestOnce已完成put操作#processInternalRaftRequestOnce收到结果后就返回给用户了idx=id%defalut_lengthch := w.e[idx].m[id]ch <- xclose(ch) #这里是关闭chanetcdserver.EtcdServer.setAppliedIndex #更新apply索引etcdserver.EtcdServer.setTerm #更新任期wait_time.timeList.Trigger(appliedIndex) #唤醒所有阻塞在appliedIndex之前的读请求。#etcd ReadIndex(线性读):#当读请求到来时会记录下此刻的commitedIndex值,#并保存到confirmIndex变量中,#只有当appliedIndex>=confirmIndex的时候读请求才会解除阻塞,#才会去数据库读数据,#这里已经apply到appliedIndex了,所以在此之前的读请求都可以被唤醒了<- apply.notifyc #在上一步所有wal日志刷盘以后才会通知etcdserver进行快照etcdserver.EtcdServer.triggerSnapshot if ep.appliedi-ep.snapi <= s.Cfg.SnapshotCount:return #如果当前最新的appliedIndex减去当前快照的最后一条日志的Index#超过了阈值就触发快照,否则跳过,默认是1w条etcdserver.EtcdServer.snapshot #创建快照clone := s.v2store.Clone() #克隆当前store#etcd是所有数据都存放在内存,所以直接内存中复制#!!!v3.6即main分支会使用v3,不会执行clone #!!!而v3.5则还是使用v2存储,snapshot的时候会执行克隆s.KV().Commit() #强制提交以便kv持久化metadatago func(){clone.SaveNoCopy()return json.Marshal #返回json格式编码的storeraft.MemoryStorage.CreateSnapshot #更新memtable里的snapmetadatastorage.storage.SaveSnap #把集群状态相关信息(成员、term、index)写入一个新创建的xx.snap文件,#同时在wal日志中写一条snapshot的日志#之后由异步的purge线程在下一次启动的时候#根据这个xx.snap文件中的配置信息去执行删除操作#(比如把所有index<x的wal日志文件都删掉)#同时如果节点重启,那么节点会把applideIndex设置为最新的快照中的index,#然后从此处开始重新apply日志storage.storage.Release #清理member/snap目录下过时的以.snap.db结尾的文件wal.Wal.ReleaseLockTo #释放过时的wal文件的锁,否则这些文件会被etcd锁住,#purgeFile线程无法打开snap.Snapshotter.ReleaseSnapDBs #删除掉member/snap目录下以.snap.db结尾的文件#因为我们已经在applyEntries之前applySnapShot过了,所以可以删除s.r.raftStorage.Compact(compacti)------->6:node.run处理recvc #处理follower节点发过来的消息,#put流程则对应MsgAppResp即leader发MsgApp给follwer,#follower然后返回MsgAppResp#由发送线程异步发送#当发送线程收到响应的时候就会把消息填充到n.recvc chan#步骤4中处理完后会写advance这个chan来通知node.run前进raft.node.runfor{selectcase m := <-n.recvc: if pr := r.prs.Progress[m.From]; pr != nil || !IsResponseMsg(m.Type): #这个if会过滤掉来源未知的响应消息raft.raft.Stepraft.stepLeaderselectcase pb.MsgAppResp: #处理日志append成功的消息,主要操作是修改leader的commitedIndexif tracker.Progress.MaybeUpdate #判断的同时更新消息来源节点对应的progress对象的Match和Next字段#m.index表示该条MsgAppResp消息中日志的索引#MsgAppResp表示成功追加了一条日志,而raft不允许日志空洞,#因此收到一条日志索引字段为10的MsgAppResp消息就表明10之前的所有日志,#该节点上都有,所以直接把令pr.Match=m.index,pr.Next=m.index+1raft.raft.maybeCommit #判断是否可以进行一次提交操作,即是否有过半的节点已经成功追加日志#节点对应的progress对象的Match字段用来表示该节点上日志已经复制到哪个位置了#(关闭allowUnstable时match<unstable.offset)#收到一个MsgAppResp消息的时候#我们在MaybeUpdate里会更新消息来源节点的progress对象的Match字段#就是说leader有一个progerss数组,#然后每收到一个MsgAppResp消息就通过MaybeUpdate更新一个pregress对象的Match#更新完之后就调用maybeCommite判断一次,#判断是否有过半节点的progress对象的Match字段已经大于等于leader的commitedIndex#就是先对所有match字段升序排序,#然后取n - (n/2 + 1)这个位置的match,#如果大于leader的commitedIndex说明已经有过半节点成功追加日志#即[commitedIndex,Matchs[n-(n/2+1)]这个区间内的日志是新增的#并且是可以commited的日志。etcd他是按顺序把日志写入wal文件,#wal文件中日志的状态是不知道的,所以etcd是通过一些索引变量来判断该日志是什么状态的#这些索引变量就是applied/commited/unstable,#如果一条日志的索引即index>commited就说明该日志还没有commited#因为wal日志已经持久化了,就算崩溃了也没事,#因为wal日志中每条记录都包含了(term/index/type/data四个字段)#集群启动的时候,applied/unstable这些变量的值都是可以自动生成#commited会作为硬状态的一部分写入磁盘tracker.ProgressTracker.Committed #获取排序后n-(n/2+1)这个位置的progrees对象的match字段raft.raftlog.maybeCommit #比较一下中间那个match是否大于当前leader的commitedIndex,#如果大于就说明有日志可以提交raft.raftlog.commitTo #把leader节点的commiteIndex更新到n-(n/2+1)这个位置对应的match值raft.releasePendingReadIndexMessages #释放之前挂起的一致性读请求raft.raft.bcastAppend #这次会发一条不带任何日志的MsgApp消息给所有其他节点通知他们已经提交到这里了#bcastAppend发过去的MsgApp消息两部分:待append的日志和peer节点新状态#因为代码是通过MsgAppResp消息来激活检测,#而通过maybeCommite中检测时是直接检测所有集群的日志提交情况#并不是检测日志是否更新到了MsgAppResp中的日志所对应的index,#即收到MsgAppResp只是告知程序需要检测一下集群提交状态#如果有新日志可以提交,就通知所有节点更新commitedIndex#比如三个节点的集群(leader,follwer1,follwer2),#commitedindex分别为(x,x,x),然后经过一段时间运行#三个节点的日志分别已经追加到了(x+3,x+1,x+2),#此时收到了一个迟来的MsgAppResp,这个MsgAppResp中的消息对应的日志索引为x-1,#然后这个消息就会激活检测,#leader检测到此时x+2及x+2以前的日志都是可以提交的#(n-(n/2+1)=1,排序后match[1]=x+2#leader就会通知他们把commitedIndex更新到x+2,#更新完以后commitedIndex分别是是(x+2,x+1,x+2)#因为过半就行,所以leader和follower2的commitedIndex都可以更新到x+2,#而follower1的日志最多只能到x+1,所以只更新到x+1#这种情况就是follower1滞后了,没关系,后面赶上就行了,#目前集群还是有过半节点正常的,可以正常运行------->7:node.run处理advancec #步骤4中会写advancec,通知node前进raft.node.runrawNode.RawNode.Advanceraft.raft.advancelog.raftlog.appliedTo #更新applyindexraftLog.stableTo #shrink unstable数组,也就是已经apply的数据可以丢弃从而减少unstable数组大小#步骤5中会