编辑
2023-10-21
数据结构与算法
00
请注意,本文编写于 567 天前,最后修改于 567 天前,其中某些信息可能已经过时。
go
func (cm *ConsensusModule) AppendEntries(args AppendEntriesArgs, reply *AppendEntriesReply) error { cm.mu.Lock() defer cm.mu.Unlock() if cm.state == Dead { return nil } cm.dlog("AppendEntries: %+v", args) if args.Term > cm.currentTerm { cm.dlog("... term out of date in AppendEntries") cm.becomeFollower(args.Term) } reply.Success = false if args.Term == cm.currentTerm { if cm.state != Follower { cm.becomeFollower(args.Term) } cm.electionResetEvent = time.Now() // Does our log contain an entry at PrevLogIndex whose term matches // PrevLogTerm? Note that in the extreme case of PrevLogIndex=-1 this is // vacuously true. if args.PrevLogIndex == -1 || (args.PrevLogIndex < len(cm.log) && args.PrevLogTerm == cm.log[args.PrevLogIndex].Term) { reply.Success = true // Find an insertion point - where there's a term mismatch between // the existing log starting at PrevLogIndex+1 and the new entries sent // in the RPC. logInsertIndex := args.PrevLogIndex + 1 newEntriesIndex := 0 for { if logInsertIndex >= len(cm.log) || newEntriesIndex >= len(args.Entries) { break } if cm.log[logInsertIndex].Term != args.Entries[newEntriesIndex].Term { break } logInsertIndex++ newEntriesIndex++ } // At the end of this loop: // - logInsertIndex points at the end of the log, or an index where the // term mismatches with an entry from the leader // - newEntriesIndex points at the end of Entries, or an index where the // term mismatches with the corresponding log entry if newEntriesIndex < len(args.Entries) { cm.dlog("... inserting entries %v from index %d", args.Entries[newEntriesIndex:], logInsertIndex) cm.log = append(cm.log[:logInsertIndex], args.Entries[newEntriesIndex:]...) cm.dlog("... log is now: %v", cm.log) } // Set commit index. if args.LeaderCommit > cm.commitIndex { cm.commitIndex = intMin(args.LeaderCommit, len(cm.log)-1) cm.dlog("... setting commitIndex=%d", cm.commitIndex) cm.newCommitReadyChan <- struct{}{} } } else { // No match for PrevLogIndex/PrevLogTerm. Populate // ConflictIndex/ConflictTerm to help the leader bring us up to date // quickly. if args.PrevLogIndex >= len(cm.log) { reply.ConflictIndex = len(cm.log) reply.ConflictTerm = -1 } else { // PrevLogIndex points within our log, but PrevLogTerm doesn't match // cm.log[PrevLogIndex]. reply.ConflictTerm = cm.log[args.PrevLogIndex].Term var i int for i = args.PrevLogIndex - 1; i >= 0; i-- { if cm.log[i].Term != reply.ConflictTerm { break } } reply.ConflictIndex = i + 1 } } } reply.Term = cm.currentTerm cm.persistToStorage() cm.dlog("AppendEntries reply: %+v", *reply) return nil }
go
// RequestVote RPC. func (cm *ConsensusModule) RequestVote(args RequestVoteArgs, reply *RequestVoteReply) error { cm.mu.Lock() defer cm.mu.Unlock() if cm.state == Dead { return nil } lastLogIndex, lastLogTerm := cm.lastLogIndexAndTerm() cm.dlog("RequestVote: %+v [currentTerm=%d, votedFor=%d, log index/term=(%d, %d)]", args, cm.currentTerm, cm.votedFor, lastLogIndex, lastLogTerm) if args.Term > cm.currentTerm { cm.dlog("... term out of date in RequestVote") cm.becomeFollower(args.Term) } if cm.currentTerm == args.Term && (cm.votedFor == -1 || cm.votedFor == args.CandidateId) && (args.LastLogTerm > lastLogTerm || (args.LastLogTerm == lastLogTerm && args.LastLogIndex >= lastLogIndex)) { reply.VoteGranted = true cm.votedFor = args.CandidateId cm.electionResetEvent = time.Now() } else { reply.VoteGranted = false } reply.Term = cm.currentTerm cm.persistToStorage() cm.dlog("... RequestVote reply: %+v", reply) return nil }

本文作者:yowayimono

本文链接:

版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!