要理解 watchableStore 我们首先得搞清楚 storestore 其实是 ConsistentKV,而 watchableStore 其实只是实现了 watchable 的功能而已,所有的 KV interface 都是由 store 来完成的。

创建 store

简化后流程如下(省略了一些不关键的代码

// NewStore returns a new store. It is useful to create a store inside
// mvcc pkg. It should only be used for testing externally.
func NewStore(lg *zap.Logger, b backend.Backend, le lease.Lessor, ig ConsistentIndexGetter) *store {
	s := &store{
			// 在内存中创建 btree 索引
			kvindex: newTreeIndex(lg),
      // 初始化的时候 revision 从 1 开始
			currentRev:     1,
			compactMainRev: -1,
			// 先入先出的 job 调度队列
			fifoSched: schedule.NewFIFOScheduler(),
		}
		s.ReadView = &readView{s}
		s.WriteView = &writeView{s}
		if s.le != nil {
      // lessor 在 lease 过期 之后会调用这个 RangeDeleter 来删除 mvcc 中对应的资源
			s.le.SetRangeDeleter(func() lease.TxnDelete { return s.Write() })
		}
	
		tx := s.b.BatchTx()
		tx.Lock()
    // key bucket 就是用来存 key-value 的
		tx.UnsafeCreateBucket(keyBucketName)
    // meta bucket 用来存 etcd 相关的元数据,比如 finishedCompactRevision scheduledCompactRevision
		tx.UnsafeCreateBucket(metaBucketName)
		tx.Unlock()
		s.b.ForceCommit()
	
		s.mu.Lock()
		defer s.mu.Unlock()

    // 从文件中恢复 mvcc
		if err := s.restore(); err != nil {
			panic("failed to recover store from backend")
		}
	
		return s
}

restore

Watchable

在了解清楚了 store 的存储和索引机制之后,我们来看看它是怎么实现 watch 的功能。

首先理解一下完整的调用链。这里有很多知识点,画重点,要考的

client 通过一次 gRPC 请求跟 etcd 服务器建立一个 gRPC stream,同时 etcd 服务器为这个客户端维护了一个 watchStream,然后客户端通过 gRPC stream 发送一个 WatchCreateRequest 请求 etcd 创建一个 watcher channel,至此 watch 通道创建完毕。

stream 的模型如下,理论上一个 client 对应一个 gRPC stream 和 watch stream,同一个 client 的多个 watcher 共享一个 stream 达到多路复用的效果,通过事件中的 watcher id 来确定归属