gofunc (cm *ConsensusModule) AppendEntries(args AppendEntriesArgs, reply *AppendEntriesReply) error {
cm.mu.Lock()
defer cm.mu.Unlock()
if cm.state == Dead {
return nil
}
cm.dlog("AppendEntries: %+v", args)
if args.Term > cm.currentTerm {
cm.dlog("... term out of date in AppendEntries")
cm.becomeFollower(args.Term)
}
reply.Success = false
if args.Term == cm.currentTerm {
if cm.state != Follower {
cm.becomeFollower(args.Term)
}
cm.electionResetEvent = time.Now()
// Does our log contain an entry at PrevLogIndex whose term matches
// PrevLogTerm? Note that in the extreme case of PrevLogIndex=-1 this is
// vacuously true.
if args.PrevLogIndex == -1 ||
(args.PrevLogIndex < len(cm.log) && args.PrevLogTerm == cm.log[args.PrevLogIndex].Term) {
reply.Success = true
// Find an insertion point - where there's a term mismatch between
// the existing log starting at PrevLogIndex+1 and the new entries sent
// in the RPC.
logInsertIndex := args.PrevLogIndex + 1
newEntriesIndex := 0
for {
if logInsertIndex >= len(cm.log) || newEntriesIndex >= len(args.Entries) {
break
}
if cm.log[logInsertIndex].Term != args.Entries[newEntriesIndex].Term {
break
}
logInsertIndex++
newEntriesIndex++
}
// At the end of this loop:
// - logInsertIndex points at the end of the log, or an index where the
// term mismatches with an entry from the leader
// - newEntriesIndex points at the end of Entries, or an index where the
// term mismatches with the corresponding log entry
if newEntriesIndex < len(args.Entries) {
cm.dlog("... inserting entries %v from index %d", args.Entries[newEntriesIndex:], logInsertIndex)
cm.log = append(cm.log[:logInsertIndex], args.Entries[newEntriesIndex:]...)
cm.dlog("... log is now: %v", cm.log)
}
// Set commit index.
if args.LeaderCommit > cm.commitIndex {
cm.commitIndex = intMin(args.LeaderCommit, len(cm.log)-1)
cm.dlog("... setting commitIndex=%d", cm.commitIndex)
cm.newCommitReadyChan <- struct{}{}
}
} else {
// No match for PrevLogIndex/PrevLogTerm. Populate
// ConflictIndex/ConflictTerm to help the leader bring us up to date
// quickly.
if args.PrevLogIndex >= len(cm.log) {
reply.ConflictIndex = len(cm.log)
reply.ConflictTerm = -1
} else {
// PrevLogIndex points within our log, but PrevLogTerm doesn't match
// cm.log[PrevLogIndex].
reply.ConflictTerm = cm.log[args.PrevLogIndex].Term
var i int
for i = args.PrevLogIndex - 1; i >= 0; i-- {
if cm.log[i].Term != reply.ConflictTerm {
break
}
}
reply.ConflictIndex = i + 1
}
}
}
reply.Term = cm.currentTerm
cm.persistToStorage()
cm.dlog("AppendEntries reply: %+v", *reply)
return nil
}
go// RequestVote RPC.
func (cm *ConsensusModule) RequestVote(args RequestVoteArgs, reply *RequestVoteReply) error {
cm.mu.Lock()
defer cm.mu.Unlock()
if cm.state == Dead {
return nil
}
lastLogIndex, lastLogTerm := cm.lastLogIndexAndTerm()
cm.dlog("RequestVote: %+v [currentTerm=%d, votedFor=%d, log index/term=(%d, %d)]", args, cm.currentTerm, cm.votedFor, lastLogIndex, lastLogTerm)
if args.Term > cm.currentTerm {
cm.dlog("... term out of date in RequestVote")
cm.becomeFollower(args.Term)
}
if cm.currentTerm == args.Term &&
(cm.votedFor == -1 || cm.votedFor == args.CandidateId) &&
(args.LastLogTerm > lastLogTerm ||
(args.LastLogTerm == lastLogTerm && args.LastLogIndex >= lastLogIndex)) {
reply.VoteGranted = true
cm.votedFor = args.CandidateId
cm.electionResetEvent = time.Now()
} else {
reply.VoteGranted = false
}
reply.Term = cm.currentTerm
cm.persistToStorage()
cm.dlog("... RequestVote reply: %+v", reply)
return nil
}
本文作者:yowayimono
本文链接:
版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!