编辑
2023-10-21
数据结构与算法
00
请注意,本文编写于 567 天前,最后修改于 567 天前,其中某些信息可能已经过时。

目录

初始化时等待就绪
开始决定是否投票
同步日志
生成选举超时时间
选举触发
开始选举
成为Fllower
Leader
发送心跳

主要是一些选举细节代码,不包含其他逻辑,主要是选举部分逻辑

初始化时等待就绪

go
func NewConsensusModule(id int, peerIds []int, server *Server, ready <-chan interface{}) *ConsensusModule { cm := new(ConsensusModule) cm.id = id cm.peerIds = peerIds cm.server = server cm.state = Follower cm.votedFor = -1 go func() { // 等待就绪 // The CM is quiescent until ready is signaled; then, it starts a countdown // for leader election. <-ready cm.mu.Lock() cm.electionResetEvent = time.Now() //重置选举时间 cm.mu.Unlock() cm.runElectionTimer() }() return cm }

开始决定是否投票

go
// RequestVote RPC. func (cm *ConsensusModule) RequestVote(args RequestVoteArgs, reply *RequestVoteReply) error { cm.mu.Lock() defer cm.mu.Unlock() if cm.state == Dead { return nil } cm.dlog("RequestVote: %+v [currentTerm=%d, votedFor=%d]", args, cm.currentTerm, cm.votedFor) if args.Term > cm.currentTerm { cm.dlog("... term out of date in RequestVote") cm.becomeFollower(args.Term) } if cm.currentTerm == args.Term && (cm.votedFor == -1 || cm.votedFor == args.CandidateId) { reply.VoteGranted = true cm.votedFor = args.CandidateId cm.electionResetEvent = time.Now() } else { reply.VoteGranted = false } reply.Term = cm.currentTerm cm.dlog("... RequestVote reply: %+v", reply) return nil }

同步日志

go
func (cm *ConsensusModule) AppendEntries(args AppendEntriesArgs, reply *AppendEntriesReply) error { cm.mu.Lock() defer cm.mu.Unlock() if cm.state == Dead { return nil } cm.dlog("AppendEntries: %+v", args) if args.Term > cm.currentTerm { cm.dlog("... term out of date in AppendEntries") cm.becomeFollower(args.Term) } reply.Success = false if args.Term == cm.currentTerm { if cm.state != Follower { cm.becomeFollower(args.Term) } cm.electionResetEvent = time.Now() //相当于是心跳一次 reply.Success = true } reply.Term = cm.currentTerm cm.dlog("AppendEntries reply: %+v", *reply) return nil }

生成选举超时时间

go
// electionTimeout generates a pseudo-random election timeout duration. func (cm *ConsensusModule) electionTimeout() time.Duration { // If RAFT_FORCE_MORE_REELECTION is set, stress-test by deliberately // generating a hard-coded number very often. This will create collisions // between different servers and force more re-elections. if len(os.Getenv("RAFT_FORCE_MORE_REELECTION")) > 0 && rand.Intn(3) == 0 { return time.Duration(150) * time.Millisecond } else { return time.Duration(150+rand.Intn(150)) * time.Millisecond } }

选举触发

不断的判断领导是否存活

go
// runElectionTimer 实现了一个选举计时器。当我们希望启动一个计时器以开始新一轮选举时, // 应该调用这个方法。这个方法是阻塞的,应该在一个单独的 goroutine 中运行。 // 它为单次(one-shot)的选举计时器设计,因为它会在以下情况下退出: // - 当 CM 的状态从 Follower/Candidate 改变时 // - 当当前任期(term)改变时 func (cm *ConsensusModule) runElectionTimer() { timeoutDuration := cm.electionTimeout() // 获取选举超时时间间隔 cm.mu.Lock() termStarted := cm.currentTerm // 记录选举计时器启动时的当前任期 cm.mu.Unlock() cm.dlog("election timer started (%v), term=%d", timeoutDuration, termStarted) // 以下是一个循环,它会一直运行,直到发生以下情况之一: // - 我们发现选举计时器不再需要,或 // - 选举计时器超时,该 CM 成为 Candidate // 在 Follower 中,这通常在 CM 的生命周期中一直在后台运行。 ticker := time.NewTicker(10 * time.Millisecond) defer ticker.Stop() for { <-ticker.C cm.mu.Lock() // 如果 CM 的状态变成 Candidate 或 Follower,则退出选举计时器 if cm.state != Candidate && cm.state != Follower { cm.dlog("在选举计时器中,状态=%s,退出", cm.state) cm.mu.Unlock() return } // 如果当前任期(term)改变,则退出选举计时器 if termStarted != cm.currentTerm { cm.dlog("在选举计时器中,任期从 %d 变为 %d,退出", termStarted, cm.currentTerm) cm.mu.Unlock() return } // 如果经过了选举超时时间间隔,而且我们还没有收到领袖的消息或者没有投票给其他 Candidate, // 则开始一轮新的选举。 if elapsed := time.Since(cm.electionResetEvent); elapsed >= timeoutDuration { cm.startElection() cm.mu.Unlock() return } cm.mu.Unlock() } }

开始选举

go
// startElection starts a new election with this CM as a candidate. // Expects cm.mu to be locked. func (cm *ConsensusModule) startElection() { cm.state = Candidate cm.currentTerm += 1 savedCurrentTerm := cm.currentTerm cm.electionResetEvent = time.Now() cm.votedFor = cm.id cm.dlog("becomes Candidate (currentTerm=%d); log=%v", savedCurrentTerm, cm.log) votesReceived := 1 // Send RequestVote RPCs to all other servers concurrently. for _, peerId := range cm.peerIds { go func(peerId int) { args := RequestVoteArgs{ Term: savedCurrentTerm, CandidateId: cm.id, } var reply RequestVoteReply cm.dlog("sending RequestVote to %d: %+v", peerId, args) if err := cm.server.Call(peerId, "ConsensusModule.RequestVote", args, &reply); err == nil { cm.mu.Lock() defer cm.mu.Unlock() cm.dlog("received RequestVoteReply %+v", reply) if cm.state != Candidate { cm.dlog("while waiting for reply, state = %v", cm.state) return } if reply.Term > savedCurrentTerm { cm.dlog("term out of date in RequestVoteReply") cm.becomeFollower(reply.Term) return } else if reply.Term == savedCurrentTerm { if reply.VoteGranted { votesReceived += 1 if votesReceived*2 > len(cm.peerIds)+1 { // Won the election! cm.dlog("wins election with %d votes", votesReceived) cm.startLeader() return } } } } }(peerId) } // Run another election timer, in case this election is not successful. go cm.runElectionTimer() }

成为Fllower

go
// becomeFollower makes cm a follower and resets its state. // Expects cm.mu to be locked. func (cm *ConsensusModule) becomeFollower(term int) { cm.dlog("becomes Follower with term=%d; log=%v", term, cm.log) cm.state = Follower cm.currentTerm = term cm.votedFor = -1 cm.electionResetEvent = time.Now() go cm.runElectionTimer() }

Leader

go
// startLeader switches cm into a leader state and begins process of heartbeats. // Expects cm.mu to be locked. func (cm *ConsensusModule) startLeader() { cm.state = Leader cm.dlog("becomes Leader; term=%d, log=%v", cm.currentTerm, cm.log) go func() { ticker := time.NewTicker(50 * time.Millisecond) defer ticker.Stop() // Send periodic heartbeats, as long as still leader. for { cm.leaderSendHeartbeats() <-ticker.C cm.mu.Lock() if cm.state != Leader { cm.mu.Unlock() return } cm.mu.Unlock() } }() }

发送心跳

go
// leaderSendHeartbeats sends a round of heartbeats to all peers, collects their // replies and adjusts cm's state. func (cm *ConsensusModule) leaderSendHeartbeats() { cm.mu.Lock() if cm.state != Leader { cm.mu.Unlock() return } savedCurrentTerm := cm.currentTerm cm.mu.Unlock() for _, peerId := range cm.peerIds { args := AppendEntriesArgs{ Term: savedCurrentTerm, LeaderId: cm.id, } go func(peerId int) { cm.dlog("sending AppendEntries to %v: ni=%d, args=%+v", peerId, 0, args) var reply AppendEntriesReply if err := cm.server.Call(peerId, "ConsensusModule.AppendEntries", args, &reply); err == nil { cm.mu.Lock() defer cm.mu.Unlock() if reply.Term > savedCurrentTerm { cm.dlog("term out of date in heartbeat reply") cm.becomeFollower(reply.Term) return } } }(peerId) } }

本文作者:yowayimono

本文链接:

版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!