raft源码分析
hjc987
7年前
<p>这篇文章主要是从源码的级别来看 Raft 算法的实现。在网上找到了一个简化版: <a href="/misc/goto?guid=4959754432427820978" rel="nofollow,noindex">源码</a> .</p> <p>一个 Server 结构代表 Raft 网络中的一个 节点 。节点会创建一个 Server ,并且通过 端(peers) 接口的方式暴露给其他节点。</p> <p>传输层采用 http 包装, 端对端 通信通过 rest http 方式。</p> <pre> <code class="language-go">|http transport| ---> |peers| ---> |server| </code></pre> <h2>项目简介</h2> <h2>节点的增加和删除</h2> <p>支持动态增删节点,采用一个简单的 共识 算法(节点更新时,接受配置更新的节点需要超过1/2,即新集群要大于旧集群)。</p> <h2>roadmap</h2> <ul> <li>leader选举</li> <li>日志复制</li> <li>单元测试</li> <li>http 传输层</li> <li>配置变更</li> </ul> <p>除此之外,还有一些是未完成的</p> <ul> <li>net/rpc 传输层或者其他类型的传输层</li> <li>日志压缩</li> <li>快照安装以及恢复</li> <li>完整的 demo 应用</li> <li>一些比较复杂的测试用例<br> 具体细节,看下面的代码分析。 <h2>源码分析</h2> <h2>源码目录结构</h2> <pre> <code class="language-go">├── JOINT-CONSENSUS.md ├── LICENSE ├── README.md ├── configuration.go // 配置 ├── example_test.go // demo ├── log.go // 日志 ├── log_test.go // 日志测试模块 ├── peers.go // 端 ├── peers_test.go // 端模块 ├── rpc.go // rpc 对象模块 ├── server.go // server模块 ├── server_internals_test.go // server内部测试模块 ├── server_test.go // server测试模块 ├── transport.go // 传输层 └── transport_test.go // 传输层模块 </code></pre> </li> </ul> <h2>主要的数据结构</h2> <h3>rpc.go</h3> <pre> <code class="language-go">// 日志追加 type appendEntriesTuple struct { // 日志追加请求 Request appendEntries // 应答通道 Response chan appendEntriesResponse } // 投票选举 type requestVoteTuple struct { // 选举内容 Request requestVote // 选举结构应答 Response chan requestVoteResponse } // appendEntries represents an appendEntries RPC. // 日志追加-实体 type appendEntries struct { // 任期号 Term uint64 `json:"term"` // leader 标识 LeaderID uint64 `json:"leader_id"` // 前一个日志索引 PrevLogIndex uint64 `json:"prev_log_index"` // 前一个日志任期号 PrevLogTerm uint64 `json:"prev_log_term"` // 要追加的实体数组-支持批量追加 Entries []logEntry `json:"entries"` // 已经committed的缩影 CommitIndex uint64 `json:"commit_index"` } // appendEntriesResponse represents the response to an appendEntries RPC. // 日志追加应答 type appendEntriesResponse struct { // 应答节点的任期号 Term uint64 `json:"term"` // 是否追加成功 Success bool `json:"success"` // 失败的原因 reason string } // requestVote represents a requestVote RPC. // 投票请求实体 type requestVote struct { // 发起者的任期号 Term uint64 `json:"term"` // 发起者的id CandidateID uint64 `json:"candidate_id"` // 发起者的最新条目 LastLogIndex uint64 `json:"last_log_index"` // 发起者的最新任期号 LastLogTerm uint64 `json:"last_log_term"` } // requestVoteResponse represents the response to a requestVote RPC. // 投票应答 type requestVoteResponse struct { // 应答者任期号 Term uint64 `json:"term"` // 应答结果,true赞同,false反对 VoteGranted bool `json:"vote_granted"` // 反对原因 reason string } </code></pre> <h3>log.go</h3> <p><img src="https://simg.open-open.com/show/f2b9a5aa2ed1d1fbf99ff7b7cd04fd2e.png"></p> <pre> <code class="language-go">var ( // 任期号太小 errTermTooSmall = errors.New("term too small") // 日志索引太小 errIndexTooSmall = errors.New("index too small") // 日志缩影太大 errIndexTooBig = errors.New("commit index too big") // 日志条目内容已损坏 errInvalidChecksum = errors.New("invalid checksum") // 无效的命令 errNoCommand = errors.New("no command") // 错误的日志索引 errBadIndex = errors.New("bad index") // 错误任期号 errBadTerm = errors.New("bad term") ) // 日志结构 type raftLog struct { // 日志读写锁 sync.RWMutex // 日志存储接口 store io.Writer // 日志镜像,现在存储于内存 entries []logEntry // 下一条日志commit索引 commitPos int // "操作"的回调函数,这个函数比较重要,可以"操作集合"镜像, // 在快照时,只需要将"结果"快到存储层即可 apply func(uint64, []byte)[]byte } func newRaftLog(store io.ReadWriter, applyfunc(uint64, []byte) []byte) *raftLog { l := &raftLog{ store: store, entries: []logEntry{}, commitPos: -1, // no commits to begin with apply: apply, } l.recover(store) return l } // recover reads from the log's store, to populate the log with log entries // from persistent storage. It should be called once, at log instantiation. // 日志恢复,当服务重启时,重建日志条目(一般重建都是居于于快照和日志的,可是这里没有实现快照,所以从日志中重建即可) // 1、这里的日志时commited之后的日志,所以重建时,commitPos也会更新 // 2、重建日志条目,会调用apply函数,对日志进行处理,这个函数相当于"状态机"功能;如果有快照(相当于Redis 的RDB),先将安装快照,再恢复日志(相当于Redis 的aof) func (l *raftLog)recover(r io.Reader)error { for { var entry logEntry switch err := entry.decode(r); err { case io.EOF: return nil // successful completion case nil: if err := l.appendEntry(entry); err != nil { return err } l.commitPos++ l.apply(entry.Index, entry.Command) default: return err // unsuccessful completion } } } // entriesAfter returns a slice of log entries after (i.e. not including) the // passed index, and the term of the log entry specified by index, as a // convenience to the caller. (This function is only used by a leader attempting // to flush log entries to its followers.) // // This function is called to populate an AppendEntries RPC. That implies they // are destined for a follower, which implies the application of the commands // should have the response thrown away, which implies we shouldn't pass a // commandResponse channel (see: commitTo implementation). In the normal case, // the raftLogEntries we return here will get serialized as they pass thru their // transport, and lose their commandResponse channel anyway. But in the case of // a LocalPeer (or equivalent) this doesn't happen. So, we must make sure to // proactively strip commandResponse channels. // 检索index之后的日志条目 func (l *raftLog)entriesAfter(indexuint64)([]logEntry,uint64) { l.RLock() defer l.RUnlock() // 1.检索出index对应term以及在实体集合entries中的位置Pos pos := 0 lastTerm := uint64(0) for ; pos < len(l.entries); pos++ { if l.entries[pos].Index > index { break } lastTerm = l.entries[pos].Term } a := l.entries[pos:] if len(a) == 0 { return []logEntry{}, lastTerm } // 除去command Response channel return stripResponseChannels(a), lastTerm } func stripResponseChannels(a []logEntry)[]logEntry { stripped := make([]logEntry, len(a)) for i, entry := range a { stripped[i] = logEntry{ Index: entry.Index, Term: entry.Term, Command: entry.Command, commandResponse: nil, } } return stripped } // contains returns true if a log entry with the given index and term exists in // the log. // 判断是够包含{term, index}条目 func (l *raftLog)contains(index, termuint64)bool { l.RLock() defer l.RUnlock() // It's not necessarily true that l.entries[i] has index == i. for _, entry := range l.entries { if entry.Index == index && entry.Term == term { return true } if entry.Index > index || entry.Term > term { break } } return false } // 判断{term, index}是否为最新的日志条目,如果是,则将则将在其之后的日志清理掉, // 这个条目应该在[commit_index, last_index]范围内 func (l *raftLog)ensureLastIs(index, termuint64)error { l.Lock() defer l.Unlock() // Taken loosely from benbjohnson's impl if index < l.getCommitIndexWithLock() { return errIndexTooSmall } if index > l.lastIndexWithLock() { return errIndexTooBig } // It's possible that the passed index is 0. It means the leader has come to // decide we need a complete log rebuild. Of course, that's only valid if we // haven't committed anything, so this check comes after that one. // 全部重建,前提是没有commited过任何的条目 if index == 0 { for pos := 0; pos < len(l.entries); pos++ { if l.entries[pos].commandResponse != nil { close(l.entries[pos].commandResponse) l.entries[pos].commandResponse = nil } if l.entries[pos].committed != nil { l.entries[pos].committed <- false close(l.entries[pos].committed) l.entries[pos].committed = nil } } l.entries = []logEntry{} return nil } // Normal case: find the position of the matching log entry. pos := 0 for ; pos < len(l.entries); pos++ { if l.entries[pos].Index < index { continue // didn't find it yet } if l.entries[pos].Index > index { return errBadIndex // somehow went past it } if l.entries[pos].Index != index { panic("not <, not >, but somehow !=") } if l.entries[pos].Term != term { return errBadTerm } break // good } // Sanity check. // ? 怎么可能出现这种情况? if pos < l.commitPos { panic("index >= commitIndex, but pos < commitPos") } // `pos` is the position of log entry matching index and term. // We want to truncate everything after that. // 应为{term, index}是最新的了,所以将在其之后的所有条目给cut掉 truncateFrom := pos + 1 if truncateFrom >= len(l.entries) { return nil // nothing to truncate } // If we blow away log entries that haven't yet sent responses to clients, // signal the clients to stop waiting, by closing the channel without a // response value. for pos = truncateFrom; pos < len(l.entries); pos++ { if l.entries[pos].commandResponse != nil { close(l.entries[pos].commandResponse) l.entries[pos].commandResponse = nil } if l.entries[pos].committed != nil { l.entries[pos].committed <- false close(l.entries[pos].committed) l.entries[pos].committed = nil } } // Truncate the log. l.entries = l.entries[:truncateFrom] // Done. return nil } // getCommitIndex returns the commit index of the log. That is, the index of the // last log entry which can be considered committed. // 获取最新的commited日志条目 func (l *raftLog)getCommitIndex()uint64 { l.RLock() defer l.RUnlock() return l.getCommitIndexWithLock() } // 获取最新的日志条目 func (l *raftLog)getCommitIndexWithLock()uint64 { if l.commitPos < 0 { return 0 } if l.commitPos >= len(l.entries) { panic(fmt.Sprintf("commitPos %d > len(l.entries) %d; bad bookkeeping in raftLog", l.commitPos, len(l.entries))) } return l.entries[l.commitPos].Index } // lastIndex returns the index of the most recent log entry. func (l *raftLog)lastIndex()uint64 { l.RLock() defer l.RUnlock() return l.lastIndexWithLock() } func (l *raftLog)lastIndexWithLock()uint64 { if len(l.entries) <= 0 { return 0 } return l.entries[len(l.entries)-1].Index } // lastTerm returns the term of the most recent log entry. func (l *raftLog)lastTerm()uint64 { l.RLock() defer l.RUnlock() return l.lastTermWithLock() } func (l *raftLog)lastTermWithLock()uint64 { if len(l.entries) <= 0 { return 0 } return l.entries[len(l.entries)-1].Term } // appendEntry appends the passed log entry to the log. It will return an error // if the entry's term is smaller than the log's most recent term, or if the // entry's index is too small relative to the log's most recent entry. // 追加日志,注意此时还没有commit该条目 func (l *raftLog)appendEntry(entry logEntry)error { l.Lock() defer l.Unlock() // 判定{entry.term, entry.index} > {last_term, last_index} if len(l.entries) > 0 { lastTerm := l.lastTermWithLock() if entry.Term < lastTerm { return errTermTooSmall } lastIndex := l.lastIndexWithLock() if entry.Term == lastTerm && entry.Index <= lastIndex { return errIndexTooSmall } } l.entries = append(l.entries, entry) return nil } // commitTo commits all log entries up to and including the passed commitIndex. // Commit means: synchronize the log entry to persistent storage, and call the // state machine apply function for the log entry's command. // 注意: // 1、commit是一个后端任务,再此并没有"1/2"确认的概念(实际上是不是这样呢,这得去参考raft的论文了) // 2、apply函数是在commit过程中调用,而不是在append的时候调用 // 3、apply相当于状态机函数,一般用户会将这些操作结果保存起来,用于快照 // 比如,想实现一个kv存储,那么用户只要将kv相关的逻辑植入这个函数即可 // committed <= commitIndex <= last_index func (l *raftLog)commitTo(commitIndexuint64)error { if commitIndex == 0 { panic("commitTo(0)") } l.Lock() defer l.Unlock() // Reject old commit indexes if commitIndex < l.getCommitIndexWithLock() { return errIndexTooSmall } // Reject new commit indexes if commitIndex > l.lastIndexWithLock() { return errIndexTooBig } // If we've already committed to the commitIndex, great! if commitIndex == l.getCommitIndexWithLock() { return nil } // We should start committing at precisely the last commitPos + 1 pos := l.commitPos + 1 if pos < 0 { panic("pending commit pos < 0") } // Commit entries between our existing commit index and the passed index. // Remember to include the passed index. for { // Sanity checks. TODO replace with plain `for` when this is stable. if pos >= len(l.entries) { panic(fmt.Sprintf("commitTo pos=%d advanced past all log entries (%d)", pos, len(l.entries))) } if l.entries[pos].Index > commitIndex { panic("commitTo advanced past the desired commitIndex") } // Encode the entry to persistent storage. if err := l.entries[pos].encode(l.store); err != nil { return err } // Forward non-configuration commands to the state machine. // Send the responses to the waiting client, if applicable. // 如果不是配置类型的Log,则调用apply function // 配置类型的Log,在其他地方处理 if !l.entries[pos].isConfiguration { resp := l.apply(l.entries[pos].Index, l.entries[pos].Command) if l.entries[pos].commandResponse != nil { select { case l.entries[pos].commandResponse <- resp: break // 问什么选取这个时间??? case <-time.After(maximumElectionTimeout()): // << ElectionInterval panic("uncoöperative command response receiver") } close(l.entries[pos].commandResponse) l.entries[pos].commandResponse = nil } } // Signal the entry has been committed, if applicable. if l.entries[pos].committed != nil { l.entries[pos].committed <- true close(l.entries[pos].committed) l.entries[pos].committed = nil } // Mark our commit position cursor. l.commitPos = pos // If that was the last one, we're done. if l.entries[pos].Index == commitIndex { break } if l.entries[pos].Index > commitIndex { panic(fmt.Sprintf( "current entry Index %d is beyond our desired commitIndex %d", l.entries[pos].Index, commitIndex, )) } // Otherwise, advance! pos++ } // Done. return nil } // logEntry is the atomic unit being managed by the distributed log. A log entry // always has an index (monotonically increasing), a term in which the Raft // network leader first sees the entry, and a command. The command is what gets // executed against the node state machine when the log entry is successfully // replicated. type logEntry struct { // 日志索引号 Index uint64 `json:"index"` // 任期号 Term uint64 `json:"term"` // when received by leader // 日志内容 Command []byte `json:"command,omitempty"` // commited 通道 committed chan bool `json:"-"` // 命令应答 通道 commandResponse chan<- []byte `json:"-"` // only non-nil on receiver's log // 日志类型标示 isConfiguration bool `json:"-"` // for configuration change entries } // encode serializes the log entry to the passed io.Writer. // // Entries are serialized in a simple binary format: // // --------------------------------------------- // | uint32 | uint64 | uint64 | uint32 | []byte | // --------------------------------------------- // | CRC | TERM | INDEX | SIZE | COMMAND | // --------------------------------------------- // // 序列化,大端 func (e *logEntry)encode(w io.Writer)error { if len(e.Command) <= 0 { return errNoCommand } if e.Index <= 0 { return errBadIndex } if e.Term <= 0 { return errBadTerm } commandSize := len(e.Command) buf := make([]byte, 24+commandSize) binary.LittleEndian.PutUint64(buf[4:12], e.Term) binary.LittleEndian.PutUint64(buf[12:20], e.Index) binary.LittleEndian.PutUint32(buf[20:24], uint32(commandSize)) copy(buf[24:], e.Command) binary.LittleEndian.PutUint32( buf[0:4], crc32.ChecksumIEEE(buf[4:]), ) _, err := w.Write(buf) return err } // 反序列化 // decode deserializes one log entry from the passed io.Reader. func (e *logEntry)decode(r io.Reader)error { header := make([]byte, 24) if _, err := r.Read(header); err != nil { return err } command := make([]byte, binary.LittleEndian.Uint32(header[20:24])) if _, err := r.Read(command); err != nil { return err } crc := binary.LittleEndian.Uint32(header[:4]) check := crc32.NewIEEE() check.Write(header[4:]) check.Write(command) if crc != check.Sum32() { return errInvalidChecksum } e.Term = binary.LittleEndian.Uint64(header[4:12]) e.Index = binary.LittleEndian.Uint64(header[12:20]) e.Command = command return nil } </code></pre> <h3>Peers.go</h3> <pre> <code class="language-go">var ( errTimeout = errors.New("timeout") ) // peers为节点的一个抽象,对外提供了一些访问接口, // 需要注意的地方是peers的序列化 type Peer interface { // 返回server标示 id() uint64 // 日志追加接口 callAppendEntries(appendEntries) appendEntriesResponse // 投票选举接口 callRequestVote(requestVote) requestVoteResponse // 命令调用 callCommand([]byte, chan<- []byte) error // 集群配置变化接口 callSetConfiguration(...Peer) error } // localPeer is the simplest kind of peer, mapped to a server in the // same process-space. Useful for testing and demonstration; not so // useful for networks of independent processes. // 本地local peers,用于测试,不用经过网络 type localPeer struct { server *Server } func newLocalPeer(server *Server)*localPeer { return &localPeer{server} } func (p *localPeer)id()uint64 { return p.server.id } // 追加日志 func (p *localPeer)callAppendEntries(ae appendEntries)appendEntriesResponse { return p.server.appendEntries(ae) } // 投票选举 func (p *localPeer)callRequestVote(rv requestVote)requestVoteResponse { return p.server.requestVote(rv) } // 命令 // 实际调用为Leader func (p *localPeer)callCommand(cmd []byte, responsechan<- []byte)error { return p.server.Command(cmd, response) } // 设置配置 func (p *localPeer)callSetConfiguration(peers ...Peer)error { return p.server.SetConfiguration(peers...) } // requestVoteTimeout issues the requestVote to the given peer. // If no response is received before timeout, an error is returned. // 投票 func requestVoteTimeout(p Peer, rv requestVote, timeout time.Duration)(requestVoteResponse, error) { c := make(chan requestVoteResponse, 1) go func() { c <- p.callRequestVote(rv) }() select { case resp := <-c: return resp, nil case <-time.After(timeout): return requestVoteResponse{}, errTimeout } } // peerMap is a collection of Peer interfaces. It provides some convenience // functions for actions that should apply to multiple Peers. type peerMap map[uint64]Peer // makePeerMap constructs a peerMap from a list of peers. func makePeerMap(peers ...Peer)peerMap { pm := peerMap{} for _, peer := range peers { pm[peer.id()] = peer } return pm } // explodePeerMap converts a peerMap into a slice of peers. func explodePeerMap(pm peerMap)[]Peer { a := []Peer{} for _, peer := range pm { a = append(a, peer) } return a } func (pm peerMap)except(iduint64)peerMap { except := peerMap{} for id0, peer := range pm { if id0 == id { continue } except[id0] = peer } return except } func (pm peerMap)count()int { return len(pm) } // 法定人数 func (pm peerMap)quorum()int { switch n := len(pm); n { case 0, 1: return 1 default: return (n / 2) + 1 } } // requestVotes sends the passed requestVote RPC to every peer in Peers. It // forwards responses along the returned requestVoteResponse channel. It makes // the RPCs with a timeout of BroadcastInterval * 2 (chosen arbitrarily). Peers // that don't respond within the timeout are retried forever. The retry loop // stops only when all peers have responded, or a Cancel signal is sent via the // returned canceler. func (pm peerMap)requestVotes(r requestVote)(chanvoteResponseTuple, canceler) { // "[A server entering the candidate stage] issues requestVote RPCs in // parallel to each of the other servers in the cluster. If the candidate // receives no response for an RPC, it reissues the RPC repeatedly until a // response arrives or the election concludes." // construct the channels we'll return abortChan := make(chan struct{}) tupleChan := make(chan voteResponseTuple) go func() { // We loop until all Peers have given us a response. // Track which Peers have responded. respondedAlready := peerMap{} // none yet for { notYetResponded := disjoint(pm, respondedAlready) if len(notYetResponded) <= 0 { return // done } // scatter tupleChan0 := make(chan voteResponseTuple, len(notYetResponded)) for id, peer := range notYetResponded { go func(iduint64, peer Peer) { resp, err := requestVoteTimeout(peer, r, 2*maximumElectionTimeout()) tupleChan0 <- voteResponseTuple{id, resp, err} }(id, peer) } // gather for i := 0; i < cap(tupleChan0); i++ { select { case t := <-tupleChan0: if t.err != nil { continue // will need to retry } respondedAlready[t.id] = nil // set membership semantics tupleChan <- t case <-abortChan: return // give up } } } }() return tupleChan, cancel(abortChan) } // 选举应答 type voteResponseTuple struct { id uint64 response requestVoteResponse err error } type canceler interface { Cancel() } type cancel chan struct{} func (c cancel)Cancel() { close(c) } // 过滤peers func disjoint(all, except peerMap)peerMap { d := peerMap{} for id, peer := range all { if _, ok := except[id]; ok { continue } d[id] = peer } return d } </code></pre> <h3>server.go</h3> <p>这是最重要的一个逻辑</p> <p><img src="https://simg.open-open.com/show/0ce67b467bfb0c5540095525dbfa59ec.png"></p> <p>几点配置变更</p> <p><img src="https://simg.open-open.com/show/19442432774f8731e27646368e69c044.png"></p> <pre> <code class="language-go">// 角色分类 const ( follower = "Follower" candidate = "Candidate" leader = "Leader" ) const ( unknownLeader = 0 noVote = 0 ) // 选举时间随机范围[MinimumElectionTimeoutMS, maximumElectionTimeoutMS] var ( MinimumElectionTimeoutMS int32 = 250 maximumElectionTimeoutMS = 2 * MinimumElectionTimeoutMS ) var ( errNotLeader = errors.New("not the leader") errUnknownLeader = errors.New("unknown leader") errDeposed = errors.New("deposed during replication") errAppendE#008000ntriesRejected = errors.New("appendEntries RPC rejected") errReplicationFailed = errors.New("command replication failed (but will keep retrying)") errOutOfSync = errors.New("out of sync") errAlreadyRunning = errors.New("already running") ) // 重置选举时间 func resetElectionTimeoutMS(newMin, newMaxint)(int,int) { oldMin := atomic.LoadInt32(&MinimumElectionTimeoutMS) oldMax := atomic.LoadInt32(&maximumElectionTimeoutMS) atomic.StoreInt32(&MinimumElectionTimeoutMS, int32(newMin)) atomic.StoreInt32(&maximumElectionTimeoutMS, int32(newMax)) return int(oldMin), int(oldMax) } // minimumElectionTimeout returns the current minimum election timeout. func minimumElectionTimeout()time.Duration { return time.Duration(MinimumElectionTimeoutMS) * time.Millisecond } // maximumElectionTimeout returns the current maximum election time. func maximumElectionTimeout()time.Duration { return time.Duration(maximumElectionTimeoutMS) * time.Millisecond } // 选举时间随机函数 func electionTimeout()time.Duration { n := rand.Intn(int(maximumElectionTimeoutMS - MinimumElectionTimeoutMS)) d := int(MinimumElectionTimeoutMS) + n return time.Duration(d) * time.Millisecond } // broadcastInterval returns the interval between heartbeats (AppendEntry RPCs) // broadcast from the leader. It is the minimum election timeout / 10, as // dictated by the spec: BroadcastInterval << ElectionTimeout << MTBF. // 广播时间,用于Leader发送心跳广播,这个时间应小于选举时间;否则,非Leader节点会产生选举操作 func broadcastInterval()time.Duration { d := MinimumElectionTimeoutMS / 10 return time.Duration(d) * time.Millisecond } // protectedString is just a string protected by a mutex. type protectedString struct { sync.RWMutex value string } func (s *protectedString)Get()string { s.RLock() defer s.RUnlock() return s.value } func (s *protectedString)Set(valuestring) { s.Lock() defer s.Unlock() s.value = value } // protectedBool is just a bool protected by a mutex. type protectedBool struct { sync.RWMutex value bool } func (s *protectedBool)Get()bool { s.RLock() defer s.RUnlock() return s.value } func (s *protectedBool)Set(valuebool) { s.Lock() defer s.Unlock() s.value = value } // Server is the agent that performs all of the Raft protocol logic. // In a typical application, each running process that wants to be part of // the distributed state machine will contain a server component. type Server struct { id uint64 // id of this server // 节点状态 state *protectedString // 节点运行状态 running *protectedBool // Leader节点标示 leader uint64 // 当前节点任期号 term uint64 // "current term number, which increases monotonically" // 0表示,当前节点还有投出自己的票;非零表示节点已经投票了,值是获票者的标示ID vote uint64 // who we voted for this term, if applicable log *raftLog config *configuration // 追加日志信道 appendEntriesChan chan appendEntriesTuple // 投票信道 requestVoteChan chan requestVoteTuple // 命令信道 commandChan chan commandTuple // 配置修改信道 configurationChan chan configurationTuple // 选举信道 electionTick <-chan time.Time // 退出信道 quit chan chan struct{} } // 状态机函数 // 该函数不可并发执行,否则就达不到一致性状态机的效果(执行时间不要超过选举时间) // 正常来说,只有"共识"达成的时候,才会调用该函数,然后返回给客户端 // 但是,在这里为了简化实现,"共识“算法是放在后台任务操作的,客户端发送命令单Leader时,Leader马上 // 应答客户端,并没有等”共识算法“的共识结果 type ApplyFunc func(commitIndexuint64, cmd []byte)[]byte // 初始化节点 // 1. 构建日志 2.初始化为"follower"角色 3.leader为"unknown" func NewServer(iduint64, store io.ReadWriter, a ApplyFunc) *Server { if id <= 0 { panic("server id must be > 0") } // 5.2 Leader election: "the latest term this server has seen is persisted, // and is initialized to 0 on first boot." log := newRaftLog(store, a) latestTerm := log.lastTerm() s := &Server{ id: id, state: &protectedString{value: follower}, // "when servers start up they begin as followers" running: &protectedBool{value: false}, leader: unknownLeader, // unknown at startup log: log, term: latestTerm, config: newConfiguration(peerMap{}), appendEntriesChan: make(chan appendEntriesTuple), requestVoteChan: make(chan requestVoteTuple), commandChan: make(chan commandTuple), configurationChan: make(chan configurationTuple), electionTick: nil, quit: make(chan chan struct{}), } s.resetElectionTimeout() return s } type configurationTuple struct { Peers []Peer Err chan error } // 设置配置 // 1. 服务启动时,先设置配置 // 2. 集群变更时,设置配置 func (s *Server)SetConfiguration(peers ...Peer)error { // 节点刚启动 if !s.running.Get() { s.config.directSet(makePeerMap(peers...)) return nil } err := make(chan error) // 节点已经启动了 s.configurationChan <- configurationTuple{peers, err} return <-err } // Start triggers the server to begin communicating with its peers. func (s *Server)Start() { go s.loop() } // Stop terminates the server. Stopped servers should not be restarted. func (s *Server)Stop() { q := make(chan struct{}) s.quit <- q <-q s.logGeneric("server stopped") } // 命令元组 type commandTuple struct { // 命令内容 Command []byte // 命令信道 CommandResponse chan<- []byte Err chan error } // 命令接口 func (s *Server)Command(cmd []byte, responsechan<- []byte)error { err := make(chan error) s.commandChan <- commandTuple{cmd, response, err} return <-err } // 日志追加 func (s *Server)appendEntries(ae appendEntries)appendEntriesResponse { t := appendEntriesTuple{ Request: ae, Response: make(chan appendEntriesResponse), } s.appendEntriesChan <- t return <-t.Response } // 投票 func (s *Server)requestVote(rv requestVote)requestVoteResponse { t := requestVoteTuple{ Request: rv, Response: make(chan requestVoteResponse), } s.requestVoteChan <- t return <-t.Response } // times out, // new election // | .-----. // | | | // v times out, | v receives votes from // +----------+ starts election +-----------+ majority of servers +--------+ // | Follower |------------------>| Candidate |---------------------->| Leader | // +----------+ +-----------+ +--------+ // ^ ^ | | // | | discovers current leader | | // | | or new term | | // | '------------------------------' | // | | // | discovers server with higher term | // '------------------------------------------------------------------' // // func (s *Server)loop() { s.running.Set(true) for s.running.Get() { switch state := s.state.Get(); state { case follower: s.followerSelect() case candidate: s.candidateSelect() case leader: s.leaderSelect() default: panic(fmt.Sprintf("unknown Server State '%s'", state)) } } } func (s *Server)resetElectionTimeout() { s.electionTick = time.NewTimer(electionTimeout()).C } func (s *Server)logGeneric(formatstring, args ...interface{}) { prefix := fmt.Sprintf("id=%d term=%d state=%s: ", s.id, s.term, s.state.Get()) log.Printf(prefix+format, args...) } func (s *Server)logAppendEntriesResponse(req appendEntries, resp appendEntriesResponse, stepDownbool) { s.logGeneric( "got appendEntries, sz=%d leader=%d prevIndex/Term=%d/%d commitIndex=%d: responded with success=%v (reason='%s') stepDown=%v", len(req.Entries), req.LeaderID, req.PrevLogIndex, req.PrevLogTerm, req.CommitIndex, resp.Success, resp.reason, stepDown, ) } func (s *Server)logRequestVoteResponse(req requestVote, resp requestVoteResponse, stepDownbool) { s.logGeneric( "got RequestVote, candidate=%d: responded with granted=%v (reason='%s') stepDown=%v", req.CandidateID, resp.VoteGranted, resp.reason, stepDown, ) } func (s *Server)handleQuit(qchan struct{}) { s.logGeneric("got quit signal") s.running.Set(false) close(q) } // 命令转发 // 如果当前节点不是Leader节点,并且已存在Leader节点,则其会以"代理“的角色,将命令转发至Leader节点 func (s *Server)forwardCommand(t commandTuple) { switch s.leader { case unknownLeader: s.logGeneric("got command, but don't know leader") t.Err <- errUnknownLeader case s.id: // I am the leader panic("impossible state in forwardCommand") default: leader, ok := s.config.get(s.leader) if !ok { panic("invalid state in peers") } s.logGeneric("got command, forwarding to leader (%d)", s.leader) // We're blocking our {follower,candidate}Select function in the // receive-command branch. If we continue to block while forwarding // the command, the leader won't be able to get a response from us! go func() { t.Err <- leader.callCommand(t.Command, t.CommandResponse) }() } } // 配置变更 // 转发规则和命令转发一样 func (s *Server)forwardConfiguration(t configurationTuple) { switch s.leader { case unknownLeader: s.logGeneric("got configuration, but don't know leader") t.Err <- errUnknownLeader case s.id: // I am the leader panic("impossible state in forwardConfiguration") default: leader, ok := s.config.get(s.leader) if !ok { panic("invalid state in peers") } s.logGeneric("got configuration, forwarding to leader (%d)", s.leader) go func() { t.Err <- leader.callSetConfiguration(t.Peers...) }() } } // follower 节点逻辑 func (s *Server)followerSelect() { for { select { case q := <-s.quit: s.handleQuit(q) return // 命令转发 case t := <-s.commandChan: s.forwardCommand(t) // 集群变更转发 case t := <-s.configurationChan: s.forwardConfiguration(t) // Leader选举 case <-s.electionTick: // 5.2 Leader election: "A follower increments its current term and // transitions to candidate state." if s.config == nil { s.logGeneric("election timeout, but no configuration: ignoring") s.resetElectionTimeout() continue } s.logGeneric("election timeout, becoming candidate") // 提高自己的任期号 s.term++ // 投票置为空 s.vote = noVote // Leader s.leader = unknownLeader // 设置节点角色为"候选人" s.state.Set(candidate) // 重置选举时间,防止马上再次出发选举 s.resetElectionTimeout() return // 日志追加(除了客户端请求,leader的心跳也会出发这个行为) case t := <-s.appendEntriesChan: if s.leader == unknownLeader { s.leader = t.Request.LeaderID s.logGeneric("discovered Leader %d", s.leader) } // 处理日志最佳操作 resp, stepDown := s.handleAppendEntries(t.Request) s.logAppendEntriesResponse(t.Request, resp, stepDown) t.Response <- resp // 如果节点已经脱离了当前的集群,需要跟新Leader地址 if stepDown { // stepDown as a Follower means just to reset the leader if s.leader != unknownLeader { s.logGeneric("abandoning old leader=%d", s.leader) } s.logGeneric("following new leader=%d", t.Request.LeaderID) s.leader = t.Request.LeaderID } // 选举 case t := <-s.requestVoteChan: // 选举处理 resp, stepDown := s.handleRequestVote(t.Request) s.logRequestVoteResponse(t.Request, resp, stepDown) t.Response <- resp // 如果落后于当前节点了,把当前的Leader修改为"unkownleader",等待讯据成功后,进行切换 if stepDown { // stepDown as a Follower means just to reset the leader if s.leader != unknownLeader { s.logGeneric("abandoning old leader=%d", s.leader) } s.logGeneric("new leader unknown") s.leader = unknownLeader } } } } // 候选状态 func (s *Server)candidateSelect() { if s.leader != unknownLeader { panic("known leader when entering candidateSelect") } if s.vote != 0 { panic("existing vote when entering candidateSelect") } // "[A server entering the candidate stage] issues requestVote RPCs in // parallel to each of the other servers in the cluster. If the candidate // receives no response for an RPC, it reissues the RPC repeatedly until a // response arrives or the election concludes." // 发起选举RPC requestVoteResponses, canceler := s.config.allPeers().except(s.id).requestVotes(requestVote{ Term: s.term, CandidateID: s.id, LastLogIndex: s.log.lastIndex(), LastLogTerm: s.log.lastTerm(), }) defer canceler.Cancel() // Set up vote tallies (plus, vote for myself) votes := map[uint64]bool{s.id: true} s.vote = s.id s.logGeneric("term=%d election started (configuration state %s)", s.term, s.config.state) // 如果已经达到了选举“共识”,则成功选举 if s.config.pass(votes) { s.logGeneric("I immediately won the election") s.leader = s.id s.state.Set(leader) s.vote = noVote return } // "A candidate continues in this state until one of three things happens: // (a) it wins the election, (b) another server establishes itself as // leader, or (c) a period of time goes by with no winner." for { select { case q := <-s.quit: s.handleQuit(q) return // 命令转发 case t := <-s.commandChan: s.forwardCommand(t) // 配置更新转发,注意和Leader的不同 case t := <-s.configurationChan: s.forwardConfiguration(t) // 收到选举的应答 case t := <-requestVoteResponses: s.logGeneric("got vote: id=%d term=%d granted=%v", t.id, t.response.Term, t.response.VoteGranted) // "A candidate wins the election if it receives votes from a // majority of servers in the full cluster for the same term." // 本节点落后于其他几点 if t.response.Term > s.term { s.logGeneric("got vote from future term (%d>%d); abandoning election", t.response.Term, s.term) s.leader = unknownLeader s.state.Set(follower) s.vote = noVote return // lose } // 收到了"落后"当前节点的应答,忽略掉它 if t.response.Term < s.term { s.logGeneric("got vote from past term (%d<%d); ignoring", t.response.Term, s.term) break } // 收到赞同票 if t.response.VoteGranted { s.logGeneric("%d voted for me", t.id) votes[t.id] = true } // "Once a candidate wins an election, it becomes leader." // “共识”达成 if s.config.pass(votes) { s.logGeneric("I won the election") s.leader = s.id s.state.Set(leader) s.vote = noVote return // win } // 收到日志追加(在这里,心跳也当做日志追加的方式发送) case t := <-s.appendEntriesChan: // "While waiting for votes, a candidate may receive an // appendEntries RPC from another server claiming to be leader. // If the leader's term (included in its RPC) is at least as // large as the candidate's current term, then the candidate // recognizes the leader as legitimate and steps down, meaning // that it returns to follower state." // 处理日志 resp, stepDown := s.handleAppendEntries(t.Request) s.logAppendEntriesResponse(t.Request, resp, stepDown) t.Response <- resp // candidate节点落后于Leader节点 if stepDown { s.logGeneric("after an appendEntries, stepping down to Follower (leader=%d)", t.Request.LeaderID) s.leader = t.Request.LeaderID s.state.Set(follower) return // lose } // 虽然当前节点是candidate节点,但集群中此时可能存在多个candidate节点 case t := <-s.requestVoteChan: // We can also be defeated by a more recent candidate resp, stepDown := s.handleRequestVote(t.Request) s.logRequestVoteResponse(t.Request, resp, stepDown) t.Response <- resp if stepDown { // 当前candidate节点落后于集群中已存在的candidate节点,将自己的角色变为follower, // 并且也会投赞同票 s.logGeneric("after a requestVote, stepping down to Follower (leader unknown)") s.leader = unknownLeader s.state.Set(follower) return // lose } // 选举 case <-s.electionTick: // "The third possible outcome is that a candidate neither wins nor // loses the election: if many followers become candidates at the // same time, votes could be split so that no candidate obtains a // majority. When this happens, each candidate will start a new // election by incrementing its term and initiating another round of // requestVote RPCs." s.logGeneric("election ended with no winner; incrementing term and trying again") s.resetElectionTimeout() s.term++ s.vote = noVote return // draw } } } // Leader 保存的Follower节点的所有最新同步条目 type nextIndex struct { sync.RWMutex m map[uint64]uint64 // followerId: nextIndex } func newNextIndex(pm peerMap, defaultNextIndexuint64)*nextIndex { ni := &nextIndex{ m: map[uint64]uint64{}, } for id := range pm { ni.m[id] = defaultNextIndex } return ni } // 找出已经同步Follower的最小日志 func (ni *nextIndex)bestIndex()uint64 { ni.RLock() defer ni.RUnlock() if len(ni.m) <= 0 { return 0 } i := uint64(math.MaxUint64) for _, nextIndex := range ni.m { if nextIndex < i { i = nextIndex } } return i } // 返回节点(id)最新的同步日志 func (ni *nextIndex)prevLogIndex(iduint64)uint64 { ni.RLock() defer ni.RUnlock() if _, ok := ni.m[id]; !ok { panic(fmt.Sprintf("peer %d not found", id)) } return ni.m[id] } // 自减节点(id)的最新同步日志,用于同步失败时的回滚 func (ni *nextIndex)decrement(iduint64, prevuint64)(uint64, error) { ni.Lock() defer ni.Unlock() i, ok := ni.m[id] if !ok { panic(fmt.Sprintf("peer %d not found", id)) } if i != prev { return i, errOutOfSync } if i > 0 { ni.m[id]-- } return ni.m[id], nil } // 更新节点(id)的同步日志 func (ni *nextIndex)set(id, index, prevuint64)(uint64, error) { ni.Lock() defer ni.Unlock() i, ok := ni.m[id] if !ok { panic(fmt.Sprintf("peer %d not found", id)) } if i != prev { return i, errOutOfSync } ni.m[id] = index return index, nil } // 心跳、复制命令都会用到该函数,flush是同步的,如果对端节点不可达,则阻塞 func (s *Server)flush(peer Peer, ni *nextIndex)error { peerID := peer.id() // Leader的任期号 currentTerm := s.term // 节点(peer)的最新同步索引 prevLogIndex := ni.prevLogIndex(peerID) // 检索出peers节点落后于Leader几点的日志条目,然后进行同步 entries, prevLogTerm := s.log.entriesAfter(prevLogIndex) // 获取Leader committed的最新索引 commitIndex := s.log.getCommitIndex() s.logGeneric("flush to %d: term=%d leaderId=%d prevLogIndex/Term=%d/%d sz=%d commitIndex=%d", peerID, currentTerm, s.id, prevLogIndex, prevLogTerm, len(entries), commitIndex) // 日志追加RPC resp := peer.callAppendEntries(appendEntries{ Term: currentTerm, LeaderID: s.id, PrevLogIndex: prevLogIndex, PrevLogTerm: prevLogTerm, Entries: entries, CommitIndex: commitIndex, }) if resp.Term > currentTerm { // 应答的节点比当前节点的任期号大,当前的Leader被罢免 s.logGeneric("flush to %d: responseTerm=%d > currentTerm=%d: deposed", peerID, resp.Term, currentTerm) return errDeposed } if !resp.Success { // 应答失败,可能是leader RPC等待超时,或者出现了网络错误(包括脑裂),回滚 newPrevLogIndex, err := ni.decrement(peerID, prevLogIndex) if err != nil { s.logGeneric("flush to %d: while decrementing prevLogIndex: %s", peerID, err) return err } s.logGeneric("flush to %d: rejected; prevLogIndex(%d) becomes %d", peerID, peerID, newPrevLogIndex) return errAppendEntriesRejected } if len(entries) > 0 { // 复制成功,更新同步状态 newPrevLogIndex, err := ni.set(peer.id(), entries[len(entries)-1].Index, prevLogIndex) if err != nil { s.logGeneric("flush to %d: while moving prevLogIndex forward: %s", peerID, err) return err } s.logGeneric("flush to %d: accepted; prevLogIndex(%d) becomes %d", peerID, peerID, newPrevLogIndex) return nil } s.logGeneric("flush to %d: accepted; prevLogIndex(%d) remains %d", peerID, peerID, ni.prevLogIndex(peerID)) return nil } // Leader并发同步日志 func (s *Server)concurrentFlush(pm peerMap, ni *nextIndex, timeout time.Duration)(int,bool) { type tuple struct { id uint64 err error } responses := make(chan tuple, len(pm)) for _, peer := range pm { go func(peer Peer) { errChan := make(chan error, 1) go func() { errChan <- s.flush(peer, ni) }() go func() { time.Sleep(timeout); errChan <- errTimeout }() responses <- tuple{peer.id(), <-errChan} // first responder wins }(peer) } successes, stepDown := 0, false for i := 0; i < cap(responses); i++ { switch t := <-responses; t.err { case nil: s.logGeneric("concurrentFlush: peer %d: OK (prevLogIndex(%d)=%d)", t.id, t.id, ni.prevLogIndex(t.id)) successes++ case errDeposed: // 当前的Leder节点落后于其他节点 s.logGeneric("concurrentFlush: peer %d: deposed!", t.id) stepDown = true default: s.logGeneric("concurrentFlush: peer %d: %s (prevLogIndex(%d)=%d)", t.id, t.err, t.id, ni.prevLogIndex(t.id)) // nothing to do but log and continue } } return successes, stepDown } // 作为Leader角色运行 func (s *Server)leaderSelect() { if s.leader != s.id { panic(fmt.Sprintf("leader (%d) not me (%d) when entering leaderSelect", s.leader, s.id)) } if s.vote != 0 { panic(fmt.Sprintf("vote (%d) not zero when entering leaderSelect", s.leader)) } // 5.3 Log replication: "The leader maintains a nextIndex for each follower, // which is the index of the next log entry the leader will send to that // follower. When a leader first comes to power it initializes all nextIndex // values to the index just after the last one in its log." // // I changed this from lastIndex+1 to simply lastIndex. Every initial // communication from leader to follower was being rejected and we were // doing the decrement. This was just annoying, except if you manage to // sneak in a command before the first heartbeat. Then, it will never get // properly replicated (it seemed). // Leader为每个Follower保存了最新的同步日志索引 ni := newNextIndex(s.config.allPeers().except(s.id), s.log.lastIndex()) // +1) flush := make(chan struct{}) heartbeat := time.NewTicker(broadcastInterval()) defer heartbeat.Stop() go func() { // 发送心跳,除了检测心跳外,还有防止Follower发送选举 for _ = range heartbeat.C { flush <- struct{}{} } }() for { select { case q := <-s.quit: s.handleQuit(q) return // 收到命令 case t := <-s.commandChan: // Append the command to our (leader) log s.logGeneric("got command, appending") currentTerm := s.term entry := logEntry{ Index: s.log.lastIndex() + 1, Term: currentTerm, Command: t.Command, commandResponse: t.CommandResponse, } // 追加日志 if err := s.log.appendEntry(entry); err != nil { t.Err <- err continue } s.logGeneric( "after append, commitIndex=%d lastIndex=%d lastTerm=%d", s.log.getCommitIndex(), s.log.lastIndex(), s.log.lastTerm(), ) // Now that the entry is in the log, we can fall back to the // normal flushing mechanism to attempt to replicate the entry // and advance the commit index. We trigger a manual flush as a // convenience, so our caller might get a response a bit sooner. // 这里将日志同步放到了同步队列就返回给客户端了,正常来说,需要"共识"达成才返回给客户端 go func() { flush <- struct{}{} }() t.Err <- nil // 收到配置变更 case t := <-s.configurationChan: // Attempt to change our local configuration if err := s.config.changeTo(makePeerMap(t.Peers...)); err != nil { t.Err <- err continue } // Serialize the local (C_old,new) configuration encodedConfiguration, err := s.config.encode() if err != nil { t.Err <- err continue } // We're gonna write+replicate that config via log mechanisms. // Prepare the on-commit callback. entry := logEntry{ Index: s.log.lastIndex() + 1, Term: s.term, Command: encodedConfiguration, isConfiguration: true, committed: make(chan bool), } go func() { // 当日志被commited时,committed将被回调 committed := <-entry.committed if !committed { s.config.changeAborted() return } // 日志被committed了,说明其他节点都应用了最新的配置,所以当前的节点配置也需要更新 s.config.changeCommitted() if _, ok := s.config.allPeers()[s.id]; !ok { // 当前节点已被新集群剔除 s.logGeneric("leader expelled; shutting down") q := make(chan struct{}) s.quit <- q // 节点已退出 <-q } }() // 日志追加 if err := s.log.appendEntry(entry); err != nil { t.Err <- err continue } case <-flush: // 获取需要同步的节点 recipients := s.config.allPeers().except(s.id) // Special case: network of 1 if len(recipients) <= 0 { ourLastIndex := s.log.lastIndex() if ourLastIndex > 0 { if err := s.log.commitTo(ourLastIndex); err != nil { s.logGeneric("commitTo(%d): %s", ourLastIndex, err) continue } s.logGeneric("after commitTo(%d), commitIndex=%d", ourLastIndex, s.log.getCommitIndex()) } continue } // Normal case: network of at-least-2 // 并发同步日志 successes, stepDown := s.concurrentFlush(recipients, ni, 2*broadcastInterval()) if stepDown { // 节点已被卸任 s.logGeneric("deposed during flush") s.state.Set(follower) s.leader = unknownLeader return } // Only when we know all followers accepted the flush can we // consider incrementing commitIndex and pushing out another // round of flushes. if successes == len(recipients) { // 最小被同步的Index peersBestIndex := ni.bestIndex() ourLastIndex := s.log.lastIndex() ourCommitIndex := s.log.getCommitIndex() if peersBestIndex > ourLastIndex { // safety check: we've probably been deposed s.logGeneric("peers' best index %d > our lastIndex %d", peersBestIndex, ourLastIndex) s.logGeneric("this is crazy, I'm gonna become a follower") s.leader = unknownLeader s.vote = noVote s.state.Set(follower) return } if peersBestIndex > ourCommitIndex { // committed Leader Index if err := s.log.commitTo(peersBestIndex); err != nil { s.logGeneric("commitTo(%d): %s", peersBestIndex, err) // 比如某个Follower在同步Index时失败了, continue // oh well, next time? } if s.log.getCommitIndex() > ourCommitIndex { // 继续同步日志 s.logGeneric("after commitTo(%d), commitIndex=%d -- queueing another flush", peersBestIndex, s.log.getCommitIndex()) go func() { flush <- struct{}{} }() } } } // 追加日志, 正常来说,Leader节点是不会受到该命令的,出现这种的可能是集群存在一个新的Leader节点,这命令就是该Leader发送过来的 case t := <-s.appendEntriesChan: resp, stepDown := s.handleAppendEntries(t.Request) s.logAppendEntriesResponse(t.Request, resp, stepDown) t.Response <- resp if stepDown { s.logGeneric("after an appendEntries, deposed to Follower (leader=%d)", t.Request.LeaderID) s.leader = t.Request.LeaderID s.state.Set(follower) return // deposed } // 受到投票请求 case t := <-s.requestVoteChan: resp, stepDown := s.handleRequestVote(t.Request) s.logRequestVoteResponse(t.Request, resp, stepDown) t.Response <- resp if stepDown { s.logGeneric("after a requestVote, deposed to Follower (leader unknown)") s.leader = unknownLeader s.state.Set(follower) return // deposed } } } } // handleRequestVote will modify s.term and s.vote, but nothing else. // stepDown means you need to: s.leader=unknownLeader, s.state.Set(Follower). // 处理投票 // 可能会修改s.term和s.vote 的值; stepDown意味着需要设置s.leader = unkownLeader, s.state.Set(Follower) func (s *Server)handleRequestVote(rv requestVote)(requestVoteResponse,bool) { // Spec is ambiguous here; basing this (loosely!) on benbjohnson's impl // If the request is from an old term, reject if rv.Term < s.term { return requestVoteResponse{ Term: s.term, VoteGranted: false, reason: fmt.Sprintf("Term %d < %d", rv.Term, s.term), }, false } // If the request is from a newer term, reset our state stepDown := false if rv.Term > s.term { // 本地节点落后于集群的其他节点,需要更新一下自己的任期号 s.logGeneric("requestVote from newer term (%d): we defer", rv.Term) s.term = rv.Term s.vote = noVote s.leader = unknownLeader stepDown = true } // Special case: if we're the leader, and we haven't been deposed by a more // recent term, then we should always deny the vote if s.state.Get() == leader && !stepDown { // 如果本地节点是Leader,并且又不落后于req 节点,则投反对票 return requestVoteResponse{ Term: s.term, VoteGranted: false, reason: "already the leader", }, stepDown } // If we've already voted for someone else this term, reject // 如果已经投过票,则投失败票 if s.vote != 0 && s.vote != rv.CandidateID { if stepDown { panic("impossible state in handleRequestVote") } return requestVoteResponse{ Term: s.term, VoteGranted: false, reason: fmt.Sprintf("already cast vote for %d", s.vote), }, stepDown } // If the candidate log isn't at least as recent as ours, reject if s.log.lastIndex() > rv.LastLogIndex || s.log.lastTerm() > rv.LastLogTerm { return requestVoteResponse{ Term: s.term, VoteGranted: false, reason: fmt.Sprintf( "our index/term %d/%d > %d/%d", s.log.lastIndex(), s.log.lastTerm(), rv.LastLogIndex, rv.LastLogTerm, ), }, stepDown } // We passed all the tests: cast vote in favor s.vote = rv.CandidateID s.resetElectionTimeout() return requestVoteResponse{ Term: s.term, VoteGranted: true, }, stepDown } // handleAppendEntries will modify s.term and s.vote, but nothing else. // stepDown means you need to: s.leader=r.LeaderID, s.state.Set(Follower). // 追加日志,需要注意的是,handleAppendEntries也会修改s.term和s.vote // stepDown也会修改s.Leader, s,state // 需要注意的是,本地节点的state不同时,其行为也是不用的 func (s *Server)handleAppendEntries(r appendEntries)(appendEntriesResponse,bool) { // Spec is ambiguous here; basing this on benbjohnson's impl // Maybe a nicer way to handle this is to define explicit handler functions // for each Server state. Then, we won't try to hide too much logic (i.e. // too many protocol rules) in one code path. // If the request is from an old term, reject if r.Term < s.term { return appendEntriesResponse{ Term: s.term, Success: false, reason: fmt.Sprintf("Term %d < %d", r.Term, s.term), }, false } // If the request is from a newer term, reset our state stepDown := false if r.Term > s.term { s.term = r.Term s.vote = noVote stepDown = true } // Special case for candidates: "While waiting for votes, a candidate may // receive an appendEntries RPC from another server claiming to be leader. // If the leader’s term (included in its RPC) is at least as large as the // candidate’s current term, then the candidate recognizes the leader as // legitimate and steps down, meaning that it returns to follower state." if s.state.Get() == candidate && r.LeaderID != s.leader && r.Term >= s.term { s.term = r.Term s.vote = noVote stepDown = true } // In any case, reset our election timeout s.resetElectionTimeout() // Reject if log doesn't contain a matching previous entry // 如果{PreLogIndex, PreLogTerm} 不是最新的条目,则失败 // [{1, 2},{1, 3}, {1,4},{1,5},{1,6}] => {1,5} => [{1, 2},{1, 3}, {1,4},{1,5}] if err := s.log.ensureLastIs(r.PrevLogIndex, r.PrevLogTerm); err != nil { return appendEntriesResponse{ Term: s.term, Success: false, reason: fmt.Sprintf( "while ensuring last log entry had index=%d term=%d: error: %s", r.PrevLogIndex, r.PrevLogTerm, err, ), }, stepDown } // Process the entries for i, entry := range r.Entries { // Configuration changes requre special preprocessing var pm peerMap // 处理配置 if entry.isConfiguration { commandBuf := bytes.NewBuffer(entry.Command) if err := gob.NewDecoder(commandBuf).Decode(±); err != nil { panic("gob decode of peers failed") } if s.state.Get() == leader { // TODO should we instead just ignore this entry? return appendEntriesResponse{ Term: s.term, Success: false, reason: fmt.Sprintf( "AppendEntry %d/%d failed (configuration): %s", i+1, len(r.Entries), "Leader shouldn't receive configurations via appendEntries", ), }, stepDown } // Expulsion recognition if _, ok := pm[s.id]; !ok { entry.committed = make(chan bool) go func() { if <-entry.committed { s.logGeneric("non-leader expelled; shutting down") q := make(chan struct{}) s.quit <- q <-q } }() } } // Append entry to the log if err := s.log.appendEntry(entry); err != nil { return appendEntriesResponse{ Term: s.term, Success: false, reason: fmt.Sprintf( "AppendEntry %d/%d failed: %s", i+1, len(r.Entries), err, ), }, stepDown } // "Once a given server adds the new configuration entry to its log, it // uses that configuration for all future decisions (it does not wait // for the entry to become committed)." if entry.isConfiguration { if err := s.config.directSet(pm); err != nil { return appendEntriesResponse{ Term: s.term, Success: false, reason: fmt.Sprintf( "AppendEntry %d/%d failed (configuration): %s", i+1, len(r.Entries), err, ), }, stepDown } } } // Commit up to the commit index. // // < ptrb> ongardie: if the new leader sends a 0-entry appendEntries // with lastIndex=5 commitIndex=4, to a follower that has lastIndex=5 // commitIndex=5 -- in my impl, this fails, because commitIndex is too // small. shouldn't be? // <@ongardie> ptrb: i don't think that should fail // <@ongardie> there are 4 ways an appendEntries request can fail: (1) // network drops packet (2) caller has stale term (3) would leave gap in // the recipient's log (4) term of entry preceding the new entries doesn't // match the term at the same index on the recipient // // 出现这种情况的原因可能是本地节点运行到committed逻辑的时候出现了问题,或者说应答给Leader时,网络出现了问题等等。 // 这些情况都会造成数据不同步的情况,也就是本地节点的commiitted情况和Leader节点保存的Follower(本地节点)不一致 if r.CommitIndex > 0 && r.CommitIndex > s.log.getCommitIndex() { if err := s.log.commitTo(r.CommitIndex); err != nil { return appendEntriesResponse{ Term: s.term, Success: false, reason: fmt.Sprintf("CommitTo(%d) failed: %s", r.CommitIndex, err), }, stepDown } } // all good return appendEntriesResponse{ Term: s.term, Success: true, }, stepDown } </code></pre> <h3>configuration.go</h3> <pre> <code class="language-go">var ( errConfigurationAlreadyChanging = errors.New("configuration already changing") ) const ( cOld = "C_old" cOldNew = "C_old,new" ) // configuration represents the sets of peers and behaviors required to // implement joint-consensus. type configuration struct { sync.RWMutex state string // 老配置 cOldPeers peerMap // 新配置-》用于过度 cNewPeers peerMap } // newConfiguration returns a new configuration in stable (C_old) state based // on the passed peers. func newConfiguration(pm peerMap)*configuration { return &configuration{ state: cOld, // start in a stable state, cOldPeers: pm, // with only C_old } } // directSet is used when bootstrapping, and when receiving a replicated // configuration from a leader. It directly sets the configuration to the // passed peers. It's assumed this is called on a non-leader, and therefore // requires no consistency dance. // 配置变更 func (c *configuration)directSet(pm peerMap)error { c.Lock() defer c.Unlock() c.cOldPeers = pm c.cNewPeers = peerMap{} c.state = cOld return nil } func (c *configuration)get(iduint64)(Peer,bool) { c.RLock() defer c.RUnlock() if peer, ok := c.cOldPeers[id]; ok { return peer, true } if peer, ok := c.cNewPeers[id]; ok { return peer, true } return nil, false } func (c *configuration)encode()([]byte, error) { buf := &bytes.Buffer{} if err := gob.NewEncoder(buf).Encode(c.allPeers()); err != nil { return []byte{}, err } return buf.Bytes(), nil } // allPeers returns the union set of all peers in the configuration. func (c *configuration)allPeers()peerMap { c.RLock() defer c.RUnlock() union := peerMap{} for id, peer := range c.cOldPeers { union[id] = peer } for id, peer := range c.cNewPeers { union[id] = peer } return union } // pass returns true if the votes represented by the votes map are sufficient // to constitute a quorum. pass respects C_old,new requirements, which dictate // that any request must receive a majority from both C_old and C_new to pass. // 共识判断 func (c *configuration)pass(votesmap[uint64]bool)bool { c.RLock() defer c.RUnlock() // Count the votes cOldHave, cOldRequired := 0, c.cOldPeers.quorum() for id := range c.cOldPeers { if votes[id] { cOldHave++ } if cOldHave >= cOldRequired { break } } // If we've already failed, we can stop here if cOldHave < cOldRequired { return false } // C_old passes: if we're in C_old, we pass if c.state == cOld { return true } // Not in C_old, so make sure we have some peers in C_new if len(c.cNewPeers) <= 0 { panic(fmt.Sprintf("configuration state '%s', but no C_new peers", c.state)) } // Since we're in C_old,new, we need to also pass C_new to pass overall. // It's important that we range through C_new and check our votes map, and // not the other way around: if a server casts a vote but doesn't exist in // a particular configuration, that vote should not be counted. cNewHave, cNewRequired := 0, c.cNewPeers.quorum() for id := range c.cNewPeers { if votes[id] { cNewHave++ } if cNewHave >= cNewRequired { break } } return cNewHave >= cNewRequired } // 配置变更准备, prepare-change func (c *configuration)changeTo(pm peerMap)error { c.Lock() defer c.Unlock() if c.state != cOld { return errConfigurationAlreadyChanging } if len(c.cNewPeers) > 0 { panic(fmt.Sprintf("configuration ChangeTo in state '%s', but have C_new peers already", c.state)) } c.cNewPeers = pm c.state = cOldNew return nil } // 提交变更逻辑 func (c *configuration)changeCommitted() { c.Lock() defer c.Unlock() if c.state != cOldNew { panic("configuration ChangeCommitted, but not in C_old,new") } if len(c.cNewPeers) <= 0 { panic("configuration ChangeCommitted, but C_new peers are empty") } c.cOldPeers = c.cNewPeers c.cNewPeers = peerMap{} c.state = cOld } // 中断变更 func (c *configuration)changeAborted() { c.Lock() defer c.Unlock() if c.state != cOldNew { panic("configuration ChangeAborted, but not in C_old,new") } c.cNewPeers = peerMap{} c.state = cOld } </code></pre> <h2>Demo</h2> <pre> <code class="language-go">package main import ( "bytes" "fmt" "hash/fnv" "net/http" "net/url" "time" "github.com/peterbourgon/raft" ) func main() { a := func(idxuint64, cmd []byte)[]byte { fmt.Printf("%d, apply function: %s\n", idx, cmd) return cmd } mustParseURL := func(rawURLstring)*url.URL { u, _ := url.Parse(rawURL) u.Path = "" return u } mustNewHTTPPeer := func(u *url.URL)raft.Peer { p, err := raft.NewHTTPPeer(u) if err != nil { panic(err) } return p } peersAddr := []string{ "127.0.0.1:7090", "127.0.0.1:7091", "127.0.0.1:7092", "127.0.0.1:7093", "127.0.0.1:7094"} var ss []*raft.Server for _, addr := range peersAddr { hash := fnv.New64() hash.Write([]byte(addr)) id := hash.Sum64() hash.Reset() s := raft.NewServer(id, &bytes.Buffer{}, a) mux := http.NewServeMux() raft.HTTPTransport(mux, s) go func(addrstring) { if err := http.ListenAndServe(addr, mux); err != nil { panic(err) } }(addr) ss = append(ss, s) } time.Sleep(time.Second) for _, s := range ss { s.SetConfiguration( mustNewHTTPPeer(mustParseURL("http://127.0.0.1:7090")), mustNewHTTPPeer(mustParseURL("http://127.0.0.1:7091")), mustNewHTTPPeer(mustParseURL("http://127.0.0.1:7092")), mustNewHTTPPeer(mustParseURL("http://127.0.0.1:7093")), mustNewHTTPPeer(mustParseURL("http://127.0.0.1:7094")), ) s.Start() } for { cmd := []byte(time.Now().String()) cmdChan := make(chan []byte) go ss[0].Command(cmd, cmdChan) <-cmdChan time.Sleep(time.Millisecond * 500) } time.Sleep(time.Hour) } </code></pre> <p>Run</p> <pre> <code class="language-go">» go run raft-server.go 2>/dev/null 1, apply function: 2017-09-11 11:41:13.668460404 +0800 CST 1, apply function: 2017-09-11 11:41:13.668460404 +0800 CST 1, apply function: 2017-09-11 11:41:13.668460404 +0800 CST 1, apply function: 2017-09-11 11:41:13.668460404 +0800 CST 1, apply function: 2017-09-11 11:41:13.668460404 +0800 CST 2, apply function: 2017-09-11 11:41:14.169165702 +0800 CST 2, apply function: 2017-09-11 11:41:14.169165702 +0800 CST 2, apply function: 2017-09-11 11:41:14.169165702 +0800 CST 2, apply function: 2017-09-11 11:41:14.169165702 +0800 CST 2, apply function: 2017-09-11 11:41:14.169165702 +0800 CST 3, apply function: 2017-09-11 11:41:14.670873193 +0800 CST 3, apply function: 2017-09-11 11:41:14.670873193 +0800 CST 3, apply function: 2017-09-11 11:41:14.670873193 +0800 CST 3, apply function: 2017-09-11 11:41:14.670873193 +0800 CST 3, apply function: 2017-09-11 11:41:14.670873193 +0800 CST 4, apply function: 2017-09-11 11:41:15.171741805 +0800 CST 4, apply function: 2017-09-11 11:41:15.171741805 +0800 CST 4, apply function: 2017-09-11 11:41:15.171741805 +0800 CST 4, apply function: 2017-09-11 11:41:15.171741805 +0800 CST 4, apply function: 2017-09-11 11:41:15.171741805 +0800 CST 5, apply function: 2017-09-11 11:41:15.673498401 +0800 CST 5, apply function: 2017-09-11 11:41:15.673498401 +0800 CST 5, apply function: 2017-09-11 11:41:15.673498401 +0800 CST 5, apply function: 2017-09-11 11:41:15.673498401 +0800 CST 5, apply function: 2017-09-11 11:41:15.673498401 +0800 CST 6, apply function: 2017-09-11 11:41:16.175658603 +0800 CST 6, apply function: 2017-09-11 11:41:16.175658603 +0800 CST 6, apply function: 2017-09-11 11:41:16.175658603 +0800 CST 6, apply function: 2017-09-11 11:41:16.175658603 +0800 CST 6, apply function: 2017-09-11 11:41:16.175658603 +0800 CST 7, apply function: 2017-09-11 11:41:16.677758823 +0800 CST </code></pre> <p> </p> <p>来自:http://laohanlinux.github.io/2017/09/11/raft源码分析/</p> <p> </p>