在etcd的项目下有一个使用raft的示例,在之前读etcd代码的时候会比较难理解raft相关的代码。因此通过这个示例会更容易的了解raft相关的实现细节。
我将这部分代码推送到了我的git仓库:https://github.com/yugu2day/raftexample
在示例中,主要是构建了一个基于map的k-v存储服务, 支持PUT
和 GET
对键值内容的存取,通过POST
和 DELETE
来添加/删除raft集群中的节点。
如何使用示例
可以按照代码中的README
在本地启动这个简单的示例, 这里也大概描述一下过程。在构建出可执行文件并通过启动参数设置节点ID,服务端口之后,就可以通过HTTP请求进行键值对的更新查询。可以从下面的输出看到在启动之后会有一些日志打印出来, 我们启动的节点id是1,在日志中就包括member1 在不同的term
中的角色切换。
go build -o raftexample
./raftexample --id 1 --cluster http://127.0.0.1:12379 --port 123802024/08/26 16:07:28 replaying WAL of member 1
2024/08/26 16:07:28 loading WAL at term 0 and index 0
raft2024/08/26 16:07:28 INFO: 1 switched to configuration voters=()
raft2024/08/26 16:07:28 INFO: 1 became follower at term 0
raft2024/08/26 16:07:28 INFO: newRaft 1 [peers: [], term: 0, commit: 0, applied: 0, lastindex: 0, lastterm: 0]
raft2024/08/26 16:07:28 INFO: 1 became follower at term 1
raft2024/08/26 16:07:28 INFO: 1 switched to configuration voters=(1)
raft2024/08/26 16:07:28 INFO: 1 switched to configuration voters=(1)
raft2024/08/26 16:07:29 INFO: 1 is starting a new election at term 1
raft2024/08/26 16:07:29 INFO: 1 became candidate at term 2
raft2024/08/26 16:07:29 INFO: 1 received MsgVoteResp from 1 at term 2
raft2024/08/26 16:07:29 INFO: 1 became leader at term 2
raft2024/08/26 16:07:29 INFO: raft.node: 1 elected leader 1 at term 2
通过http请求启动的服务, 可以看到现在GET
和 PUT
请求是能够正常的获取和更新键值对的内容的。
curl -L http://127.0.0.1:12380/my-key -XPUT -d hello
curl -L http://127.0.0.1:12380/my-key
hello%
curl -L http://127.0.0.1:12380/my-key -XPUT -d hello11
hello11%
raftnode 的启动
在main函数中,会根据我们的启动参数构建对应的raftnode,后续raft相关消息的传递都是由raftnode完成。
func main() {...getSnapshot := func() ([]byte, error) { return kvs.getSnapshot() }commitC, errorC, snapshotterReady := newRaftNode(*id, strings.Split(*cluster, ","), *join, getSnapshot, proposeC, confChangeC)...
}func newRaftNode(id int, peers []string, join bool, getSnapshot func() ([]byte, error), proposeC <-chan string,confChangeC <-chan raftpb.ConfChange) (<-chan *commit, <-chan error, <-chan *snap.Snapshotter) {commitC := make(chan *commit)errorC := make(chan error)rc := &raftNode{proposeC: proposeC,confChangeC: confChangeC,commitC: commitC,errorC: errorC,id: id,peers: peers,join: join,waldir: fmt.Sprintf("raftexample-%d", id),snapdir: fmt.Sprintf("raftexample-%d-snap", id),getSnapshot: getSnapshot,snapCount: defaultSnapshotCount,stopc: make(chan struct{}),httpstopc: make(chan struct{}),httpdonec: make(chan struct{}),logger: zap.NewExample(),snapshotterReady: make(chan *snap.Snapshotter, 1),// rest of structure populated after WAL replay}go rc.startRaft()return commitC, errorC, rc.snapshotterReady
}
在初始化raftnode实例后, 会开启一个goroutine启动该节点。可以注意一下waldir
和snapDir
, 在启动示例之后我们本地也会多出两个文件夹。这两个文件夹与我们存储内容的持久化相关, 因此在启动node之后首先会加载这两个文件的内容,将快照内容和WAL的内容加载到内存中。(这里的概念和redolog类似)加载结束后会写rc.snapshotterReady
channel, main函数中就会初始化kv实例并启动web服务。
func main(){...kvs = newKVStore(<-snapshotterReady, proposeC, commitC, errorC)// the key-value http handler will propose updates to raftserveHttpKVAPI(kvs, *kvport, confChangeC, errorC)...
在replay结束后,会创建对应的raft通信相关的对象,并添加其他节点到本节点的关联关系。
rc.transport = &rafthttp.Transport{Logger: rc.logger,ID: types.ID(rc.id),ClusterID: 0x1000,Raft: rc,ServerStats: stats.NewServerStats("", ""),LeaderStats: stats.NewLeaderStats(zap.NewExample(), strconv.Itoa(rc.id)),ErrorC: make(chan error),}rc.transport.Start()for i := range rc.peers {if i+1 != rc.id {rc.transport.AddPeer(types.ID(i+1), []string{rc.peers[i]})}}
最后起两个goroutine, 分别处理raft相关的网络请求和相关channel的消息。
go rc.serveRaft()
go rc.serveChannels()
PUT请求的处理
这部分API的代码比较简单,将k-v解析出来之后会序列化并写入此前创建node时返回的proposeC
channel 中。在案例中并没有等待raft处理的结果,而是直接返回了状态码。因此我们在PUT之后马上GET可能不能马上得到最新的结果。那么消息写入proposeC
中之后, 就会在前面提到的serveChannels
方法中处理。 最终这个消息会和前面文章提到PUT请求的处理类似将消息加载到ready实例上。
在示例中, 会将ready实例的内容写WAL,并写入commitC
。
// store raft entries to wal, then publish over commit channelcase rd := <-rc.node.Ready():rc.wal.Save(rd.HardState, rd.Entries)if !raft.IsEmptySnap(rd.Snapshot) {rc.saveSnap(rd.Snapshot)rc.raftStorage.ApplySnapshot(rd.Snapshot)rc.publishSnapshot(rd.Snapshot)}rc.raftStorage.Append(rd.Entries)rc.transport.Send(rd.Messages)applyDoneC, ok := rc.publishEntries(rc.entriesToApply(rd.CommittedEntries))if !ok {rc.stop()return}rc.maybeTriggerSnapshot(applyDoneC)rc.node.Advance()
在初始化的kvStore里, 会处理commitC里的消息。并将其应用到map中
func (s *kvstore) readCommits(commitC <-chan *commit, errorC <-chan error) {for commit := range commitC {if commit == nil {// signaled to load snapshotsnapshot, err := s.loadSnapshot()if err != nil {log.Panic(err)}if snapshot != nil {log.Printf("loading snapshot at term %d and index %d", snapshot.Metadata.Term, snapshot.Metadata.Index)if err := s.recoverFromSnapshot(snapshot.Data); err != nil {log.Panic(err)}}continue}for _, data := range commit.data {var dataKv kvdec := gob.NewDecoder(bytes.NewBufferString(data))if err := dec.Decode(&dataKv); err != nil {log.Fatalf("raftexample: could not decode message (%v)", err)}s.mu.Lock()s.kvStore[dataKv.Key] = dataKv.Vals.mu.Unlock()}close(commit.applyDoneC)}if err, ok := <-errorC; ok {log.Fatal(err)}
}