func newLessor(lg *zap.Logger, b backend.Backend, cfg LessorConfig) *lessor
lease 的创建过程比较简单
initAndRecover()
runLoop()
在参数中我们需要注意
backend
为 etcd 对 bolt
操作的封装
minLeaseTTL
由公式计算得出 time.Duration((3*cfg.ElectionTicks)/2) * heartbeat
checkpointInterval
默认值是 5分钟,没看出传进来的是什么
expiredLeaseRetryInterval
默认值 3秒,传入以下值
// etcdserver/config.go
func (c *ServerConfig) ReqTimeout() time.Duration {
// 5s for queue waiting, computation and disk IO delay
// + 2 * election timeout for possible leader election
return 5*time.Second + 2*time.Duration(c.ElectionTicks*int(c.TickMs))*time.Millisecond
}
leaseMap
用来存放 leaseID 到 lease struct 的映射关系
itemMap
用来存放 leaseItem 到 leaseID 的映射关系,其实就是 key 到 leaseID 的映射
leaseExpiredNotifier
存放将要过期的 lease 的队列,从队列取出的第一个元素为最近要过期的 lease
leaseCheckpointHeap
这个函数包含了 init 和 recover 两个动作
func (le *lessor) initAndRecover() {
// 获取 backend 的事务操作并加锁
tx := le.b.BatchTx()
tx.Lock()
// 在 backend(bolt)中创建 lease bucket
tx.UnsafeCreateBucket(leaseBucketName)
_, vs := tx.UnsafeRange(leaseBucketName, int64ToBytes(0), int64ToBytes(math.MaxInt64), 0)
// TODO: copy vs and do decoding outside tx lock if lock contention becomes an issue.
// 将 lease 恢复到 lessor.leaseMap 中
// **注意在这里并不恢复 key 和 lease 的关系
// 而是在 mvcc.KV recover 的过程中来重新建立关联,它调用了 leasor.Attach()**
for i := range vs {
var lpb leasepb.Lease
err := lpb.Unmarshal(vs[i])
...
le.leaseMap[ID] = &Lease{
ID: ID,
ttl: lpb.TTL,
// itemSet will be filled in when recover key-value pairs
// set expiry to forever, refresh when promoted
itemSet: make(map[LeaseItem]struct{}),
expiry: forever,
revokec: make(chan struct{}),
}
}
// 初始化 leaseExpiredNotifier 和 leaseCheckpointHeap
le.leaseExpiredNotifier.Init()
heap.Init(&le.leaseCheckpointHeap)
// 解锁并提交事务
tx.Unlock()
le.b.ForceCommit()
}