MVCC,全称Multi-Version Concurrency Control,即多版别并发操控,是数据库并发范畴的一个根底概念,也是在必定业务场景下解决读写并发问题的一种思路,在数据库范畴,并发操控是一个很具有挑战性的范畴,除了MVCC,常见的并发操控方法还有达观并发操控、失望并发操控等。
达观并发操控
达观并发操控(达观锁)也是一种并发操控的方法。它假设多个并发业务彼此之间不会相互影响,各业务能够在无锁的情况下处理各自的那部分数据,它一般分为三个阶段:读阶段、查看阶段、写阶段。
读阶段:数据库会履行业务中的悉数读操作和写操作,并将一切写后的值存入临时变量中,并不会真正更新数据库中的内容;
**查看阶段:**数据库程序会查看当时的改动是否合法,也便是是否有其他业务在 RAED PHASE 期间更新了数据。
**写阶段:**假如数据经过检测那么直接就进入 WRITE PHASE 将一切存在临时变量中的改动悉数写入数据库,没有经过测试的业务会直接被停止。
注:有点类似于memcache中CAS指令的感觉 ^_^
失望并发操控
失望并发操控(失望锁)它能够阻止一个业务影响其他用户来修正数据,当业务需求对资源进行操作时需求先取得资源对应的锁,确保其他业务不会拜访该资源后,对资源进行各种操作,完结较为简略,首要用于数据争用激烈、产生并发冲突时运用锁保护数据的本钱要低于回滚业务的本钱的环境中。
数据库系统比较常用的失望锁协议是两阶段锁协议(2PL,Two-Phase Locking),2PL就说一个业务被分成两个阶段:加锁阶段、开释锁阶段,其间加锁阶段有必要要在开释锁阶段完结,开释锁阶段不能在获取新的锁。一起2PL还有许多衍生概念,如下:
**保存两阶段锁(Conservative 2PL,C2PL):**业务在开始时对它依靠的一切资源进行加锁,在业务履行进程中能够开释部分不需求保护的资源的锁,不用等到业务完毕开释锁。
**严厉两阶段锁(Strict 2PL,S2PL):**业务在开始时对资源逐步加锁,也便是说业务能够在履行进程中对资源加读写写锁,可是写锁只能在业务完毕一致开释,读锁能够在食物履行进程中开释。
**强严厉两阶段锁(Strong Strict 2PL,SS2PL):**业务在开始时对资源逐步加锁,也便是说业务能够在履行进程中对资源加读写写锁,可是读锁、写锁只能在业务完毕一致开释。
多版别并发操控
多版别并发操控(Multiversion Concurrency Control,MVCC)并不是一个与达观并发操控和失望并发操控对立的概念,它能够与两者很好地结合以添加业务的并发量,MVCC 的每一个写操作都会创立一份新版别的数据,读操作会从有限多个版别的数据中挑选 “最适合版别”(最新版别、指定版别)的数据作为结果直接回来。目前最盛行的联系型数据库MySQL对MVCC的完结,经过在Undo空间中保存多分改变记载(创立、修正、删去)也是完结其MVCC的根底,详细比较详细的MYSQL MVCC原理介绍,参阅:正确的理解MySQL的MVCC及完结原理.
etcd的mvcc模块
和Mysql支持并发写操作不同, etcd首要是依据sigle-raft算法,而且etcd数据存储层运用boltdb,当etcd向底层 boltdb 层发起读写业务时,都会请求一个业务锁,该业务锁的粒度较粗,一切的读写都将受限,etcd对数据库的拜访只支持顺序写入,因而,etcd的mvcc机制更多的是在提高读并发功能,etcd在raft日志apply及写blotdb间加一层写buffer,在必定程度上是能够提高写功能的(详细怎么确保写buffer内的业务在机器crash时不丢掉,是etcd crash-safe模块要做的事情,在介绍完crash-safe依靠的组建之后,会重点整理下crash-safe流程).
mvcc模块的根底概念
revision(修订/修正)
revision的概念有点类似于业务ID,每次数据库提交一个修正业务,大局revision都会加一,该大局值保护在详细的数据库方针中的,而且每个写业务也都会分配revision方针,revision方针包括两个字段:
main:表明当时业务的revision,同一个业务内的一切修正操作共用一个main(即便在复合业务Txn中,也是只调用一次End函数来对写业务进行收尾,递加一次store.currentRev)
sub:表明业务内部改变的id[0-n],同业务内的经过sub区分多个修正操作
// A revision indicates modification of the key-value space.
// The set of changes that share same main revision changes the key-value space atomically.
type revision struct {
// main is the main revision of a set of changes that happen atomically.
main int64
// sub is the sub revision of a change in a set of changes that happen
// atomically. Each change has different increasing sub revision in that
// set.
sub int64
}
generation(一代/重生)
key被创立到删去称为generation,用来记载一个key又创立-删去运用的过的revision,假如一个key被频频修正,或许generation内容会过多,而且每个revision在boltdb中都有对应的实际数据,etcd首要是经过紧缩操作收回generation、revision及boltdb中冗余数据。
generation包括三个字段:
**ver :**记载key在本代阅历了多少次修正
**created:**创立该generation时数据的revision
**revs:**记载key在本代经每次被修正时的revision信息(紧缩时会被切断)
// generation contains multiple revisions of a key.
type generation struct {
ver int64
created revision // when the generation is created (put in first revision).
revs []revision
}
注:当key被删去时,会在generation里边追加一个空的revision,而且boltdb里运用revision构建索引时,会在索引后边追加一个字符’t’。
mvcc存储结构
etcd的mvcc模块能够分为两部分:索引组件、存储组件,如下图所示:
索引组件
etcd索引组件运用B树完结,运用数据key构建B树索引,B树运用的是开源pkg,详细如下:
github.com/google/btree
B树的数据部分为keyIndex,keyIndex保存了key相关的generation信息,keyIndex的数据结构界说如下:
type keyIndex struct {
key []byte
modified revision // the main rev of the last modification
generations []generation
}
存储组件
etcd的存储组件首要是依据boltdb完结的,存储相关的两个bucket:key、meta
key
用来存储用户k-v数据,其间序列化后的revision做索引k,序列化后的mvccpb.KeyValue做v,其间mvccpb.KeyValue的界说在如下文件中:
https://github.com/etcd-io/etcd/blob/v3.4.9/mvcc/mvccpb/kv.pb.go
meta
用来存储mvcc运行进程中的meta数据,如:compact的状态、mvcc数据库中终究一个业务相关日志Entry的index等。
mvcc源码剖析
mvcc要害接口
mvcc模块完结相关的代码在mvcc目录,github代码链接如下:
https://github.com/etcd-io/etcd/tree/v3.4.9/mvcc
其间kv.go界说了mvcc模块的一些要害接口,其间比较要害的几个接口如下:
ReadView
ReadView接口界说了mvcc履行读操作时,为当时请求树立一个读视图需求的方法,如下:
**FirstRev:**处理当时读业务时,没有被compacted的最大的revision
**Rev:**处理当时读业务时,读视图内能够看到的已完结的最大业务revision
**Range:**依照条件在当时ReadView内读取满意RangeOptions的数据
type ReadView interface {
// FirstRev returns the first KV revision at the time of opening the txn.
// After a compaction, the first revision increases to the compaction
// revision.
FirstRev() int64
// Rev returns the revision of the KV at the time of opening the txn.
Rev() int64
// Range gets the keys in the range at rangeRev.
// The returned rev is the current revision of the KV when the operation is executed.
// If rangeRev <=0, range gets the keys at currentRev.
// If `end` is nil, the request returns the key.
// If `end` is not nil and not empty, it gets the keys in range [key, range_end).
// If `end` is not nil and empty, it gets the keys greater than or equal to key.
// Limit limits the number of keys returned.
// If the required rev is compacted, ErrCompacted will be returned.
Range(key, end []byte, ro RangeOptions) (r *RangeResult, err error)
}
TxnRead
_TxnRead是对_ReadView的扩展+封装,并供给End操作,End对读业务做收尾工作,_TxnRead_接口界说如下:
// TxnRead represents a read-only transaction with operations that will not
// block other read transactions.
type TxnRead interface {
ReadView
// End marks the transaction is complete and ready to commit.
End()
}
WriteView
WriteView是写业务视图,界说了写业务视图支持的操作,详细界说如下:
type WriteView interface {
// DeleteRange deletes the given range from the store.
// A deleteRange increases the rev of the store if any key in the range exists.
// The number of key deleted will be returned.
// The returned rev is the current revision of the KV when the operation is executed.
// It also generates one event for each key delete in the event history.
// if the `end` is nil, deleteRange deletes the key.
// if the `end` is not nil, deleteRange deletes the keys in range [key, range_end).
DeleteRange(key, end []byte) (n, rev int64)
// Put puts the given key, value into the store. Put also takes additional argument lease to
// attach a lease to a key-value pair as meta-data. KV implementation does not validate the lease
// id.
// A put also increases the rev of the store, and generates one event in the event history.
// The returned rev is the current revision of the KV when the operation is executed.
Put(key, value []byte, lease lease.LeaseID) (rev int64)
}
TxnWrite
_TxnWrite是对写视图及扩展,除了扩展写视图的收尾函数End,因为etcd除了对外供给简略的Put/Get/Delete接口之外,还有Txn接口,Txn接口以if+then+else方式,能够在If中传递比较操作,比较条件更多的是读操作,因而TxnWrite还封装了_TxnRead。
// TxnWrite represents a transaction that can modify the store.
type TxnWrite interface {
TxnRead
WriteView
// Changes gets the changes made since opening the write txn.
Changes() []mvccpb.KeyValue
}
对外KV接口
除了上面几个数据库业务视图相关的接口,mvcc模块还供给了各类用处的KV接口,将mvcc的业务才能供给出去,详细接口分别如下:
**KV:**界说了根底KV读写数据库需求完结的接口
**WatchableKV:**对KV做进一步封装、扩展,供给对是读写事件的watch才能
**ConsistentWatchableKV:**对KV/WatchableKV做进一步封装、扩展,mvcc在保存业务的一起,会保存业务相关的日志Entry的index,并将index耐久化,后续服务重启,能够过滤掉现已Apply到mvcc里边的日志Entry
type KV interface {
ReadView
WriteView
// Read creates a read transaction.
Read(trace *traceutil.Trace) TxnRead
// Write creates a write transaction.
Write(trace *traceutil.Trace) TxnWrite
// Hash computes the hash of the KV's backend.
Hash() (hash uint32, revision int64, err error)
// HashByRev computes the hash of all MVCC revisions up to a given revision.
HashByRev(rev int64) (hash uint32, revision int64, compactRev int64, err error)
// Compact frees all superseded keys with revisions less than rev.
Compact(trace *traceutil.Trace, rev int64) (<-chan struct{}, error)
// Commit commits outstanding txns into the underlying backend.
Commit()
// Restore restores the KV store from a backend.
Restore(b backend.Backend) error
Close() error
}
// WatchableKV is a KV that can be watched.
type WatchableKV interface {
KV
Watchable
}
// Watchable is the interface that wraps the NewWatchStream function.
type Watchable interface {
// NewWatchStream returns a WatchStream that can be used to
// watch events happened or happening on the KV.
NewWatchStream() WatchStream
}
// ConsistentWatchableKV is a WatchableKV that understands the consistency
// algorithm and consistent index.
// If the consistent index of executing entry is not larger than the
// consistent index of ConsistentWatchableKV, all operations in
// this entry are skipped and return empty response.
type ConsistentWatchableKV interface {
WatchableKV
// ConsistentIndex returns the current consistent index of the KV.
ConsistentIndex() uint64
}
结构mvcc方针
对外KV接口一致由mvcc模块的store组件完结,store代码界说在如下文件:
https://github.com/etcd-io/etcd/blob/v3.4.9/mvcc/kvstore.go
https://github.com/etcd-io/etcd/blob/v3.4.9/mvcc/kvstore_txn.go
store经层层包装,经过ConsistentWatchableKV暴露到外部,调用mvcc.New函数便能够结构出一个ConsistentWatchableKV方针,如下:
func New(lg *zap.Logger, b backend.Backend, le lease.Lessor, as auth.AuthStore, ig ConsistentIndexGetter, cfg StoreConfig) ConsistentWatchableKV {
return newWatchableStore(lg, b, le, as, ig, cfg)
}
可是结构一个mvcc方针的参数里边,有两个要害参数需求特别重视下:
**backend.Backend:**默认运用blotdb完结
**ConsistentIndexGetter:**mvcc组件在commit业务时回经过ConsistentIndexGetter获取到当时业务相关日志Entry的Index,在apply业务之前ConsistentIndexGetter的index值由应用层更新。
Put源码剖析
mvcc在进行写业务之前,首要会经过Write函数拿到写业务上下文及视图,代码如下:
func (s *store) Write(trace *traceutil.Trace) TxnWrite {
s.mu.RLock()
tx := s.b.BatchTx()
tx.Lock()
tw := &storeTxnWrite{
storeTxnRead: storeTxnRead{s, tx, 0, 0, trace},
tx: tx,
beginRev: s.currentRev,
changes: make([]mvccpb.KeyValue, 0, 4),
}
return newMetricsTxnWrite(tw)
}
func (tw *storeTxnWrite) Put(key, value []byte, lease lease.LeaseID) int64 {
tw.put(key, value, lease)
return tw.beginRev + 1
}
流程终究会走到put接口,在put接口内部的流程如下:
-
经过tw.beginRev + 1获取本业务的revision.main
-
获取key创立时的revision,假如key存在,在当时业务视图下取该key最新的revision,递加该key修正次数ver,并将修正次数记载到该revision中,同一generation内一切revision运用同一个CreateRevision,因而从索引里边首要是查询当时业务视图下该generation的终究一个revision,便能够获取到CreateRevision
// if the key exists before, use its previous created and // get its previous leaseID _, created, ver, err := tw.s.kvindex.Get(key, rev) if err == nil { c = created.main oldLease = tw.s.le.GetLease(lease.LeaseItem{Key: string(key)}) }
-
构建本次业务的revision方针,并经过revision方针构建blotdb中value的key,经过前面拿到的CreateRevision及ver构建value方针,将数据写入业务批量buffer层,将该key的revision存入索引中,注:最新put的key,key的修正次数ver一直等于generation当时ver+1。
ibytes := newRevBytes() idxRev := revision{main: rev, sub: int64(len(tw.changes))} revToBytes(idxRev, ibytes) ver = ver + 1 kv := mvccpb.KeyValue{ Key: key, Value: value, CreateRevision: c, ModRevision: rev, Version: ver, Lease: int64(leaseID), } d, err := kv.Marshal() if err != nil { if tw.storeTxnRead.s.lg != nil { tw.storeTxnRead.s.lg.Fatal( "failed to marshal mvccpb.KeyValue", zap.Error(err), ) } else { plog.Fatalf("cannot marshal event: %v", err) } } tw.trace.Step("marshal mvccpb.KeyValue") tw.tx.UnsafeSeqPut(keyBucketName, ibytes, d) tw.s.kvindex.Put(key, idxRev) tw.changes = append(tw.changes, kv)
put函数完结后,调用End函数,End函数的首要逻辑如下
-
经过saveIndex保存本次业务相关日志Entry的index,日志Entry的index首要是经过store方针的ConsistentIndexGetter元素获取到
-
递加currentRev,开释履行业务时的锁,为开启下一次业务做准备
func (tw *storeTxnWrite) End() { // only update index if the txn modifies the mvcc state. if len(tw.changes) != 0 { tw.s.saveIndex(tw.tx) // hold revMu lock to prevent new read txns from opening until writeback. tw.s.revMu.Lock() tw.s.currentRev++ } tw.tx.Unlock() if len(tw.changes) != 0 { tw.s.revMu.Unlock() } tw.s.mu.RUnlock() }
DeleteRange源码剖析
拿到写业务上下文TxnWrite,进程同Put操作,然后调用其DeleteRange方法,当参数end为nil时,只删去key,不然删去[key, end)之间的内容,详细函数界说如下:
func (tw *storeTxnWrite) DeleteRange(key, end []byte) (int64, int64) {
if n := tw.deleteRange(key, end); n != 0 || len(tw.changes) > 0 {
return n, tw.beginRev + 1
}
return 0, tw.beginRev
}
func (tw *storeTxnWrite) deleteRange(key, end []byte) int64 {
rrev := tw.beginRev
if len(tw.changes) > 0 {
rrev++
}
keys, _ := tw.s.kvindex.Range(key, end, rrev)
if len(keys) == 0 {
return 0
}
for _, key := range keys {
tw.delete(key)
}
return int64(len(keys))
}
在deleteRange函数内部,假如之前有过改变操作,会递加rrev,原因:或许存在同一个业务中既有Put操作,也有Delete操作,递加rrev是为了删去本业务内履行本删去操作前创立的revision。
然后,运用调整后的rrev从索引中遍历找出一切满意条件的key,然后遍历一切key履行delete操作,delete操作的详细逻辑如下:
-
首要是构建删去操作的revision,并经过revision构建boltdb的key:ibytes
-
经过appendMarkTombstone函数在boltdb的key:ibytes里边追加字符‘t’
-
构建一个不包括value的mvccpb.KeyValue存储到botldb中
-
经过Tombstone函数保存被删去的revision,注:这步于确保put revision的逻辑不一样,Tombstone除存储revision之外,还会创立一个空的generation方针追加到keyIndex的generations中,标识该key的这一代完毕。
func (tw *storeTxnWrite) delete(key []byte) { ibytes := newRevBytes() idxRev := revision{main: tw.beginRev + 1, sub: int64(len(tw.changes))} revToBytes(idxRev, ibytes)
if tw.storeTxnRead.s != nil && tw.storeTxnRead.s.lg != nil { ibytes = appendMarkTombstone(tw.storeTxnRead.s.lg, ibytes) } else { // TODO: remove this in v3.5 ibytes = appendMarkTombstone(nil, ibytes) } kv := mvccpb.KeyValue{Key: key} d, err := kv.Marshal() if err != nil { if tw.storeTxnRead.s.lg != nil { tw.storeTxnRead.s.lg.Fatal( "failed to marshal mvccpb.KeyValue", zap.Error(err), ) } else { plog.Fatalf("cannot marshal event: %v", err) } } tw.tx.UnsafeSeqPut(keyBucketName, ibytes, d) err = tw.s.kvindex.Tombstone(key, idxRev) if err != nil { if tw.storeTxnRead.s.lg != nil { tw.storeTxnRead.s.lg.Fatal( "failed to tombstone an existing key", zap.String("key", string(key)), zap.Error(err), ) } else { plog.Fatalf("cannot tombstone an existing key (%s): %v", string(key), err) } } tw.changes = append(tw.changes, kv)
}
Compact源码剖析
经过剖析Put操作、DeleteRange操作,发现两个操作都是在索引以及boltdb中只添加数据,不删去数据的,因而,同一个key过旧的revision,或许会在数据库中永久不会被运用了,久而久之势必会形成不少磁盘/内存空间糟蹋,因而mvcc模块引入了Compact操作,清理不用内存索引及boltdb数据。
触发compact首要是依据装备策略,后台使命进行触发的,详细的触发compact逻辑的后台使命在如下包中,后续使命终究调用store的Compact函数完善索引及数据的紧缩进程:
https://github.com/etcd-io/etcd/blob/v3.4.9/etcdserver/api/v3compactor
store的Compact函数界说如下,在updateCompactRev中,首要更新compactMainRev的值,制止后续来的读操作拜访将要被Compact的数据,将CompactRev耐久化到 boltdb的meta信息中,避免Compact的进程中被异常停止,重启后无法恢复Compact流程,界说如下:
func (s *store) Compact(trace *traceutil.Trace, rev int64) (<-chan struct{}, error) {
s.mu.Lock()
ch, err := s.updateCompactRev(rev)
trace.Step("check and update compact revision")
if err != nil {
s.mu.Unlock()
return ch, err
}
s.mu.Unlock()
return s.compact(trace, rev)
}
Compact调用updateCompactRev完结元数据更新与保存后,调用store的compact履行真正的紧缩索引内revision,及revision对应的boltdb内部冗余数据。
紧缩索引的思路是:从单个key看,每个key有多个generation,每一个generation多个revision,找到key多代generation中不被紧缩的起始revision,该revision从新到旧第一个满意revision.main <= CompactRev,将其他revision悉数标记为unavailable,该进程在keyIndex的doCompact函数中完结,详细如下:
func (ki *keyIndex) doCompact(atRev int64, available map[revision]struct{}) (genIdx int, revIndex int) {
// walk until reaching the first revision smaller or equal to "atRev",
// and add the revision to the available map
f := func(rev revision) bool {
if rev.main <= atRev {
available[rev] = struct{}{}
return false
}
return true
}
genIdx, g := 0, &ki.generations[0]
// find first generation includes atRev or created after atRev
for genIdx < len(ki.generations)-1 {
if tomb := g.revs[len(g.revs)-1].main; tomb > atRev {
break
}
genIdx++
g = &ki.generations[genIdx]
}
revIndex = g.walk(f)
return genIdx, revIndex
}
注:doCompact在查询进程中有部分优化,如在查找方针revision,从终究一个generation开始查找,判别方针revision是否在该generation中,只需比较该generation的第一个revision。
如图【etcd索引+存储】中,假如CompactRev > 524 且 CompactRev < 768,那么keyA中revision{main:524, sub:1}便是方针revision,那么与方针revision在同一generation中,且比方针revision大的将会被保存到available中,可是保存到available中的revision未必是能够保存的,假如方针revision正好是被删去时填充的revision,会从available移除,该revision也会被紧缩,因而满意如下条件的revision都有被紧缩的或许,因而紧缩时需求从boltdb中找出一切满意以下条件的一切revision对应的数据,然后清除。
revision.main <= CompactRevde
检测方针revision是否是被删去的逻辑如下:
// compact compacts a keyIndex by removing the versions with smaller or equal
// revision than the given atRev except the largest one (If the largest one is
// a tombstone, it will not be kept).
// If a generation becomes empty during compaction, it will be removed.
func (ki *keyIndex) compact(lg *zap.Logger, atRev int64, available map[revision]struct{}) {
if ki.isEmpty() {
if lg != nil {
lg.Panic(
"'compact' got an unexpected empty keyIndex",
zap.String("key", string(ki.key)),
)
} else {
plog.Panicf("store.keyindex: unexpected compact on empty keyIndex %s", string(ki.key))
}
}
genIdx, revIndex := ki.doCompact(atRev, available)
g := &ki.generations[genIdx]
if !g.isEmpty() {
// remove the previous contents.
if revIndex != -1 {
g.revs = g.revs[revIndex:]
}
// remove any tombstone
if len(g.revs) == 1 && genIdx != len(ki.generations)-1 {
delete(available, g.revs[0])
genIdx++
}
}
// remove the previous generations.
ki.generations = ki.generations[genIdx:]
}
从索引中获取到available之后,会调用store的scheduleCompaction函数,该函数会顺次从store中查找满意如下条件的数据,假如key不在available中,那么该数据将会被删去,冗余数据被删去之后,将完结紧缩标识写入到boltdb的meta信息中,至此紧缩进程完毕。
revision.main <= CompactRevde
Range源码剖析
range操作是etcd中一个相对简略的操作,可是有些细节也要重视下,Range操作的进口函数如下:
func (tw *storeTxnWrite) Range(key, end []byte, ro RangeOptions) (r *RangeResult, err error) {
rev := tw.beginRev
if len(tw.changes) > 0 {
rev++
}
return tw.rangeKeys(key, end, rev, ro)
}
在Range中首要判别当时业务的changes数量是否大于0,假如同一个Txn业务中或许会出现先修正后查询的行为,因而需求rev++提高本Range业务能够看到的数据范围,以查找到本业务内改变的数据。
rangeKeys是Range操作的主函数,首要分为两步:
- 经过index的Revisions(key, end []byte, atRev int64) []revision函数找到满意[key,end)的一切key,而且将这些key中满意条件revsion.main <= rev最新创立的revision,并满意条件的revision经过数组的方式回来。
- 遍历Revisions回来的一切revision,从boltdb中取出数据,反序列化后打包到响应中。