主要是一些选举细节代码,不包含其他逻辑,主要是选举部分逻辑
gofunc NewConsensusModule(id int, peerIds []int, server *Server, ready <-chan interface{}) *ConsensusModule {
cm := new(ConsensusModule)
cm.id = id
cm.peerIds = peerIds
cm.server = server
cm.state = Follower
cm.votedFor = -1
go func() { // 等待就绪
// The CM is quiescent until ready is signaled; then, it starts a countdown
// for leader election.
<-ready
cm.mu.Lock()
cm.electionResetEvent = time.Now() //重置选举时间
cm.mu.Unlock()
cm.runElectionTimer()
}()
return cm
}
go// RequestVote RPC.
func (cm *ConsensusModule) RequestVote(args RequestVoteArgs, reply *RequestVoteReply) error {
cm.mu.Lock()
defer cm.mu.Unlock()
if cm.state == Dead {
return nil
}
cm.dlog("RequestVote: %+v [currentTerm=%d, votedFor=%d]", args, cm.currentTerm, cm.votedFor)
if args.Term > cm.currentTerm {
cm.dlog("... term out of date in RequestVote")
cm.becomeFollower(args.Term)
}
if cm.currentTerm == args.Term &&
(cm.votedFor == -1 || cm.votedFor == args.CandidateId) {
reply.VoteGranted = true
cm.votedFor = args.CandidateId
cm.electionResetEvent = time.Now()
} else {
reply.VoteGranted = false
}
reply.Term = cm.currentTerm
cm.dlog("... RequestVote reply: %+v", reply)
return nil
}
gofunc (cm *ConsensusModule) AppendEntries(args AppendEntriesArgs, reply *AppendEntriesReply) error {
cm.mu.Lock()
defer cm.mu.Unlock()
if cm.state == Dead {
return nil
}
cm.dlog("AppendEntries: %+v", args)
if args.Term > cm.currentTerm {
cm.dlog("... term out of date in AppendEntries")
cm.becomeFollower(args.Term)
}
reply.Success = false
if args.Term == cm.currentTerm {
if cm.state != Follower {
cm.becomeFollower(args.Term)
}
cm.electionResetEvent = time.Now() //相当于是心跳一次
reply.Success = true
}
reply.Term = cm.currentTerm
cm.dlog("AppendEntries reply: %+v", *reply)
return nil
}
go// electionTimeout generates a pseudo-random election timeout duration.
func (cm *ConsensusModule) electionTimeout() time.Duration {
// If RAFT_FORCE_MORE_REELECTION is set, stress-test by deliberately
// generating a hard-coded number very often. This will create collisions
// between different servers and force more re-elections.
if len(os.Getenv("RAFT_FORCE_MORE_REELECTION")) > 0 && rand.Intn(3) == 0 {
return time.Duration(150) * time.Millisecond
} else {
return time.Duration(150+rand.Intn(150)) * time.Millisecond
}
}
不断的判断领导是否存活
go// runElectionTimer 实现了一个选举计时器。当我们希望启动一个计时器以开始新一轮选举时,
// 应该调用这个方法。这个方法是阻塞的,应该在一个单独的 goroutine 中运行。
// 它为单次(one-shot)的选举计时器设计,因为它会在以下情况下退出:
// - 当 CM 的状态从 Follower/Candidate 改变时
// - 当当前任期(term)改变时
func (cm *ConsensusModule) runElectionTimer() {
timeoutDuration := cm.electionTimeout() // 获取选举超时时间间隔
cm.mu.Lock()
termStarted := cm.currentTerm // 记录选举计时器启动时的当前任期
cm.mu.Unlock()
cm.dlog("election timer started (%v), term=%d", timeoutDuration, termStarted)
// 以下是一个循环,它会一直运行,直到发生以下情况之一:
// - 我们发现选举计时器不再需要,或
// - 选举计时器超时,该 CM 成为 Candidate
// 在 Follower 中,这通常在 CM 的生命周期中一直在后台运行。
ticker := time.NewTicker(10 * time.Millisecond)
defer ticker.Stop()
for {
<-ticker.C
cm.mu.Lock()
// 如果 CM 的状态变成 Candidate 或 Follower,则退出选举计时器
if cm.state != Candidate && cm.state != Follower {
cm.dlog("在选举计时器中,状态=%s,退出", cm.state)
cm.mu.Unlock()
return
}
// 如果当前任期(term)改变,则退出选举计时器
if termStarted != cm.currentTerm {
cm.dlog("在选举计时器中,任期从 %d 变为 %d,退出", termStarted, cm.currentTerm)
cm.mu.Unlock()
return
}
// 如果经过了选举超时时间间隔,而且我们还没有收到领袖的消息或者没有投票给其他 Candidate,
// 则开始一轮新的选举。
if elapsed := time.Since(cm.electionResetEvent); elapsed >= timeoutDuration {
cm.startElection()
cm.mu.Unlock()
return
}
cm.mu.Unlock()
}
}
go// startElection starts a new election with this CM as a candidate.
// Expects cm.mu to be locked.
func (cm *ConsensusModule) startElection() {
cm.state = Candidate
cm.currentTerm += 1
savedCurrentTerm := cm.currentTerm
cm.electionResetEvent = time.Now()
cm.votedFor = cm.id
cm.dlog("becomes Candidate (currentTerm=%d); log=%v", savedCurrentTerm, cm.log)
votesReceived := 1
// Send RequestVote RPCs to all other servers concurrently.
for _, peerId := range cm.peerIds {
go func(peerId int) {
args := RequestVoteArgs{
Term: savedCurrentTerm,
CandidateId: cm.id,
}
var reply RequestVoteReply
cm.dlog("sending RequestVote to %d: %+v", peerId, args)
if err := cm.server.Call(peerId, "ConsensusModule.RequestVote", args, &reply); err == nil {
cm.mu.Lock()
defer cm.mu.Unlock()
cm.dlog("received RequestVoteReply %+v", reply)
if cm.state != Candidate {
cm.dlog("while waiting for reply, state = %v", cm.state)
return
}
if reply.Term > savedCurrentTerm {
cm.dlog("term out of date in RequestVoteReply")
cm.becomeFollower(reply.Term)
return
} else if reply.Term == savedCurrentTerm {
if reply.VoteGranted {
votesReceived += 1
if votesReceived*2 > len(cm.peerIds)+1 {
// Won the election!
cm.dlog("wins election with %d votes", votesReceived)
cm.startLeader()
return
}
}
}
}
}(peerId)
}
// Run another election timer, in case this election is not successful.
go cm.runElectionTimer()
}
go// becomeFollower makes cm a follower and resets its state.
// Expects cm.mu to be locked.
func (cm *ConsensusModule) becomeFollower(term int) {
cm.dlog("becomes Follower with term=%d; log=%v", term, cm.log)
cm.state = Follower
cm.currentTerm = term
cm.votedFor = -1
cm.electionResetEvent = time.Now()
go cm.runElectionTimer()
}
go// startLeader switches cm into a leader state and begins process of heartbeats.
// Expects cm.mu to be locked.
func (cm *ConsensusModule) startLeader() {
cm.state = Leader
cm.dlog("becomes Leader; term=%d, log=%v", cm.currentTerm, cm.log)
go func() {
ticker := time.NewTicker(50 * time.Millisecond)
defer ticker.Stop()
// Send periodic heartbeats, as long as still leader.
for {
cm.leaderSendHeartbeats()
<-ticker.C
cm.mu.Lock()
if cm.state != Leader {
cm.mu.Unlock()
return
}
cm.mu.Unlock()
}
}()
}
go// leaderSendHeartbeats sends a round of heartbeats to all peers, collects their
// replies and adjusts cm's state.
func (cm *ConsensusModule) leaderSendHeartbeats() {
cm.mu.Lock()
if cm.state != Leader {
cm.mu.Unlock()
return
}
savedCurrentTerm := cm.currentTerm
cm.mu.Unlock()
for _, peerId := range cm.peerIds {
args := AppendEntriesArgs{
Term: savedCurrentTerm,
LeaderId: cm.id,
}
go func(peerId int) {
cm.dlog("sending AppendEntries to %v: ni=%d, args=%+v", peerId, 0, args)
var reply AppendEntriesReply
if err := cm.server.Call(peerId, "ConsensusModule.AppendEntries", args, &reply); err == nil {
cm.mu.Lock()
defer cm.mu.Unlock()
if reply.Term > savedCurrentTerm {
cm.dlog("term out of date in heartbeat reply")
cm.becomeFollower(reply.Term)
return
}
}
}(peerId)
}
}
本文作者:yowayimono
本文链接:
版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!