概览

etcd raft pkg 在使用上有一些难度,在设计上将 raft 算法的逻辑和实际的持久化,网络完全解耦,即存储和网络层都需要调用方实现,额外的还需要按照一定要求的逻辑来调用 raft 的接口来驱动 state machine 进行状态变更

在 raft 包中有以下重要的结构

同时与这个包相关联的结构有

真假 raft node

虚假的 raft node → raft.node 只包含了各种 channel 本身就是个传声筒,啥也不干就是在导数据

真正的 raft node → etcdserver.raftNode 包含了真正的 storage 和 transport 实现,有完整的 raft 处理逻辑,负责消费各种输入输出,调用 raft 接口来驱动 state machine

入口

整个 raft 包的入口是 Node,要启动一个 raft 集群有两种方式

如果我们要启动 3 节点集群中的节点 1,则需要

func startNode() {
  // 用 raft 包实现的 memory storage 做存储
	s = raft.NewMemoryStorage()
	c := &raft.Config{
		ID:              0x01,
		ElectionTick:    10,
		HeartbeatTick:   1,
		Storage:         s,
		MaxSizePerMsg:   1024*1024,
		MaxInflightMsgs: 256,
		CheckQuorum:     true,
		PreVote:         true,
	}
  n = raft.StartNode(c, []raft.Peer{{ID:0x02}, {ID:0x03}})
}

// raft/node.go
// StartNode returns a new Node given configuration and a list of raft peers.
// It appends a ConfChangeAddNode entry for each given peer to the initial log.
func StartNode(c *Config, peers []Peer) Node

如果是从日志中恢复一个节点,则需要注意先将存储恢复到之前的状态然后再重启

func restartNode() {
  w, id, cid, hardState, entries := readWAL(cfg.Logger, cfg.WALDir(), walsnap)
  // 用 raft 包实现的 memory storage 做存储
	s = raft.NewMemoryStorage()
  // 恢复 snapshot,hardstate,将 entries 全部加入存储中去
  s.ApplySnapshot(snap)
  s.SetHardState(hardState)
  s.Append(entries)

	c := &raft.Config{
		ID:              0x01,
		ElectionTick:    10,
		HeartbeatTick:   1,
		Storage:         s,
		MaxSizePerMsg:   1024*1024,
		MaxInflightMsgs: 256,
		CheckQuorum:     true,
		PreVote:         true,
	}
  n = raft.RestartNode(c, []raft.Peer{{ID:0x02}, {ID:0x03}})
}

// raft/node.go
// RestartNode is similar to StartNode but does not take a list of peers.
// The current membership of the cluster will be restored from the Storage.
// If the caller has an existing state machine, pass in the last log index that
// has been applied to it; otherwise use zero.
func RestartNode(c *Config) Node