gofunc (cm *ConsensusModule) persistToStorage() {
var termData bytes.Buffer
if err := gob.NewEncoder(&termData).Encode(cm.currentTerm); err != nil {
log.Fatal(err)
}
cm.storage.Set("currentTerm", termData.Bytes())
var votedData bytes.Buffer
if err := gob.NewEncoder(&votedData).Encode(cm.votedFor); err != nil {
log.Fatal(err)
}
cm.storage.Set("votedFor", votedData.Bytes())
var logData bytes.Buffer
if err := gob.NewEncoder(&logData).Encode(cm.log); err != nil {
log.Fatal(err)
}
cm.storage.Set("log", logData.Bytes())
}
// Submit submits a new command to the CM. This function doesn't block; clients
// read the commit channel passed in the constructor to be notified of new
// committed entries. It returns true iff this CM is the leader - in which case
// the command is accepted. If false is returned, the client will have to find
// a different CM to submit this command to.
func (cm *ConsensusModule) Submit(command interface{}) bool {
cm.mu.Lock()
cm.dlog("Submit received by %v: %v", cm.state, command)
if cm.state == Leader {
cm.log = append(cm.log, LogEntry{Command: command, Term: cm.currentTerm})
cm.persistToStorage()
cm.dlog("... log=%v", cm.log)
cm.mu.Unlock()
cm.triggerAEChan <- struct{}{}
return true
}
cm.mu.Unlock()
return false
}
// RequestVote RPC.
func (cm *ConsensusModule) RequestVote(args RequestVoteArgs, reply *RequestVoteReply) error {
cm.mu.Lock()
defer cm.mu.Unlock()
if cm.state == Dead {
return nil
}
lastLogIndex, lastLogTerm := cm.lastLogIndexAndTerm()
cm.dlog("RequestVote: %+v [currentTerm=%d, votedFor=%d, log index/term=(%d, %d)]", args, cm.currentTerm, cm.votedFor, lastLogIndex, lastLogTerm)
if args.Term > cm.currentTerm {
cm.dlog("... term out of date in RequestVote")
cm.becomeFollower(args.Term)
}
if cm.currentTerm == args.Term &&
(cm.votedFor == -1 || cm.votedFor == args.CandidateId) &&
(args.LastLogTerm > lastLogTerm ||
(args.LastLogTerm == lastLogTerm && args.LastLogIndex >= lastLogIndex)) {
reply.VoteGranted = true
cm.votedFor = args.CandidateId
cm.electionResetEvent = time.Now()
} else {
reply.VoteGranted = false
}
reply.Term = cm.currentTerm
cm.persistToStorage()
cm.dlog("... RequestVote reply: %+v", reply)
return nil
}
func (cm *ConsensusModule) AppendEntries(args AppendEntriesArgs, reply *AppendEntriesReply) error {
本文作者:yowayimono
本文链接:
版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!