etcd raft pkg 在使用上有一些难度,在设计上将 raft 算法的逻辑和实际的持久化,网络完全解耦,即存储和网络层都需要调用方实现,额外的还需要按照一定要求的逻辑来调用 raft 的接口来驱动 state machine 进行状态变更
在 raft 包中有以下重要的结构
同时与这个包相关联的结构有
虚假的 raft node → raft.node 只包含了各种 channel 本身就是个传声筒,啥也不干就是在导数据
真正的 raft node → etcdserver.raftNode 包含了真正的 storage 和 transport 实现,有完整的 raft 处理逻辑,负责消费各种输入输出,调用 raft 接口来驱动 state machine
整个 raft 包的入口是 Node
,要启动一个 raft 集群有两种方式
如果我们要启动 3 节点集群中的节点 1,则需要
func startNode() {
// 用 raft 包实现的 memory storage 做存储
s = raft.NewMemoryStorage()
c := &raft.Config{
ID: 0x01,
ElectionTick: 10,
HeartbeatTick: 1,
Storage: s,
MaxSizePerMsg: 1024*1024,
MaxInflightMsgs: 256,
CheckQuorum: true,
PreVote: true,
}
n = raft.StartNode(c, []raft.Peer{{ID:0x02}, {ID:0x03}})
}
// raft/node.go
// StartNode returns a new Node given configuration and a list of raft peers.
// It appends a ConfChangeAddNode entry for each given peer to the initial log.
func StartNode(c *Config, peers []Peer) Node
如果是从日志中恢复一个节点,则需要注意先将存储恢复到之前的状态然后再重启
func restartNode() {
w, id, cid, hardState, entries := readWAL(cfg.Logger, cfg.WALDir(), walsnap)
// 用 raft 包实现的 memory storage 做存储
s = raft.NewMemoryStorage()
// 恢复 snapshot,hardstate,将 entries 全部加入存储中去
s.ApplySnapshot(snap)
s.SetHardState(hardState)
s.Append(entries)
c := &raft.Config{
ID: 0x01,
ElectionTick: 10,
HeartbeatTick: 1,
Storage: s,
MaxSizePerMsg: 1024*1024,
MaxInflightMsgs: 256,
CheckQuorum: true,
PreVote: true,
}
n = raft.RestartNode(c, []raft.Peer{{ID:0x02}, {ID:0x03}})
}
// raft/node.go
// RestartNode is similar to StartNode but does not take a list of peers.
// The current membership of the cluster will be restored from the Storage.
// If the caller has an existing state machine, pass in the last log index that
// has been applied to it; otherwise use zero.
func RestartNode(c *Config) Node