etcd的数据生效落库的流程首要是在日志被Leader同步到大多数raft节点,并Apply到应用层数据库的进程。raft状态机重启只会导致日志Entry丢掉,raft重启之后,能够向Leader节点同步进行数据康复的,可是日志Entry业务数据被Apply写库场景相对杂乱,日志Entry既不能漏Apply,也不能重复Apply,因而在crash-safe多的是确保etcd日志Entry在Apply流程上忍受节点宕机的能力。
etcd日志Entry组件Apply流程触及多个组件交互,如:raft状态组件、mvcc、wal组件等等,相关组件的介绍现已在专栏前面的文章由叙说,这儿首要叙说etcd是怎么安排多组件,并在etcd宕机重启后完结数据康复,实现数据一致性的,日志Apply流程如图【图解apply流程】
etcd的crash-safe可简单描绘为:运转时多组件耐久化,康复时多组件数据进行操作补偿。可是多个组件怎么交互、怎么耐久化、重启后康复补偿机制才是要害的,因而本文会分成Ready数据产生、写WAL日志、Apply一般日志Entry、重启数据康复等几个阶段介绍etcd apply流程,并介绍etcd重启后确保数据一致的crash-safe要害流程。
产生Ready数据
Ready是raft状态机将能够Apply的日志/snapshot、需求写入到WAL的日志等安排妥当数据传递给上层应用的一个数据目标,咱们只关怀apply日志Entry相关的数据字段,界说如下:
// Ready encapsulates the entries and messages that are ready to read,
// be saved to stable storage, committed or sent to other peers.
// All fields in Ready are read-only.
type Ready struct {
// The current state of a Node to be saved to stable storage BEFORE
// Messages are sent.
// HardState will be equal to empty state if there is no update.
pb.HardState
// Entries specifies entries to be saved to stable storage BEFORE
// Messages are sent.
Entries []pb.Entry
// Snapshot specifies the snapshot to be saved to stable storage.
Snapshot pb.Snapshot
// CommittedEntries specifies entries to be committed to a
// store/state-machine. These have previously been committed to stable
// store.
CommittedEntries []pb.Entry
// MustSync indicates whether the HardState and Entries must be synchronously
// written to disk or if an asynchronous write is permissible.
MustSync bool
// ...
}
Ready目标中与Apply流程有关的要害字段有如下:
**pb.HardState:**raft状态机内部需求耐久化的要害字段,如:Term任期、Vote投票目标、Commit提交的日志index等。
**Entries:**新产生的日志Entry。
**Snapshot:**Leader向Follower同步的snapshot,帮助Follower快速追上Leader的日志进展。
**CommittedEntries:**现已提交的日志Entry,等候应用到状态机中。
**MustSync:**用来告诉上层是否需求调用系统sync API进即将数据及时落盘的标识。
raft状态机产生Ready数据之后,经过Ready channel将Ready数据发出去,并用advancec等候上层处理结果,具体代码如下:
func (n *node) run() {
var propc chan msgWithResult
var readyc chan Ready
var advancec chan struct{}
var rd Ready
r := n.rn.raft
// other code ...
for {
if advancec != nil {
readyc = nil
} else if n.rn.HasReady() {
// Populate a Ready. Note that this Ready is not guaranteed to
// actually be handled. We will arm readyc, but there's no guarantee
// that we will actually send on it. It's possible that we will
// service another channel instead, loop around, and then populate
// the Ready again. We could instead force the previous Ready to be
// handled first, but it's generally good to emit larger Readys plus
// it simplifies testing (by emitting less frequently and more
// predictably).
rd = n.rn.readyWithoutAccept()
readyc = n.readyc
}
select {
// other case ...
case <-n.tickc:
n.rn.Tick()
case readyc <- rd:
n.rn.acceptReady(rd)
advancec = n.advancec
case <-advancec:
n.rn.Advance(rd)
rd = Ready{}
advancec = nil
case <-n.stop:
close(n.done)
return
}
}
}
落WAL日志
raft状态机讲Ready数据发出之后,应用层会经过监听ready channel获取到Ready数据,相关流程在如下文件中:
https://github.com/etcd-io/etcd/blob/main/server/etcdserver/raft.go
因为etcdserver/raft中raftNode对Ready结构的处理比较要害,这儿奉上于Apply相关的完好代码流程,并将注释加入到代码里边,具体逻辑如下:
func (r *raftNode) start(rh *raftReadyHandler) {
internalTimeout := time.Second
go func() {
defer r.onStop()
islead := false
for {
select {
case rd := <-r.Ready():
// 数据首要是在ApplyAll函数中,被apply到数据库中,
// 构建notifyc channel首要是为了本协程能够和ApplyAll协程进行进展协调
notifyc := make(chan struct{}, 1)
ap := apply{
entries: rd.CommittedEntries,
snapshot: rd.Snapshot,
notifyc: notifyc,
}
// 将apply数据发送给ApplyAll任务协程
select {
case r.applyc <- ap:
case <-r.stopped:
return
}
// 下面两步是耐久化数据:
// (1)假如ready中包含snapshot数据,就耐久化snapshot,然后将snapshot元数据写入到WAL中,该操作首要在如下函数完结:
// func (st *storage) SaveSnap(snap raftpb.Snapshot) error,对应图【图解apply流程】流程的3
// (2)将HardState和新产生的日志写入到WAL中,并会在最后调用sync将数据即时落盘,对应图【图解apply流程】流程的4,5
// Must save the snapshot file and WAL snapshot entry before saving any other entries or hardstate to
// ensure that recovery after a snapshot restore is possible.
if !raft.IsEmptySnap(rd.Snapshot) {
if err := r.storage.SaveSnap(rd.Snapshot); err != nil {
if r.lg != nil {
r.lg.Fatal("failed to save Raft snapshot", zap.Error(err))
} else {
plog.Fatalf("failed to save Raft snapshot %v", err)
}
}
}
if err := r.storage.Save(rd.HardState, rd.Entries); err != nil {
if r.lg != nil {
r.lg.Fatal("failed to save Raft hard state and entries", zap.Error(err))
} else {
plog.Fatalf("failed to save state and entries error: %v", err)
}
}
if !raft.IsEmptyHardState(rd.HardState) {
proposalsCommitted.Set(float64(rd.HardState.Commit))
}
if !raft.IsEmptySnap(rd.Snapshot) {
// Force WAL to fsync its hard state before Release() releases
// old data from the WAL. Otherwise could get an error like:
// panic: tocommit(107) is out of range [lastIndex(84)]. Was the raft log corrupted, truncated, or lost?
// See https://github.com/etcd-io/etcd/issues/10219 for more details.
if err := r.storage.Sync(); err != nil {
if r.lg != nil {
r.lg.Fatal("failed to sync Raft snapshot", zap.Error(err))
} else {
plog.Fatalf("failed to sync Raft snapshot %v", err)
}
}
// 假如snapshot存在的话,etcdserver在ApplyAll函数中首要会applySnapshot
// applySnapshot 首要会等候notifyc信号,只有等候信号,snapshot落盘之后才会开端apply snapshot流程
// etcdserver now claim the snapshot has been persisted onto the disk
notifyc <- struct{}{}
// 将snapshot保存到storage中到,对应图【图解apply流程】流程的8.2
r.raftStorage.ApplySnapshot(rd.Snapshot)
if r.lg != nil {
r.lg.Info("applied incoming Raft snapshot", zap.Uint64("snapshot-index", rd.Snapshot.Metadata.Index))
} else {
plog.Infof("raft applied incoming snapshot at index %d", rd.Snapshot.Metadata.Index)
}
// 整理多余的snapshot
if err := r.storage.Release(rd.Snapshot); err != nil {
if r.lg != nil {
r.lg.Fatal("failed to release Raft wal", zap.Error(err))
} else {
plog.Fatalf("failed to release Raft wal %v", err)
}
}
}
// 将日志Entry保存到Storage中,对应图【图解apply流程】流程的8.1
r.raftStorage.Append(rd.Entries)
// 等候ApplyAll流程结束
// (1)Follower及Candidate除了告诉ApplyAll raft-log、hardState、snapshot悉数落盘
// 在有EntryConfChange装备的时分需求等候Apply流程结束
// 首要是避免存在节点被除掉,导致竞选这类影响集群稳定的消息发送到即将被除掉的节点中,
// 被除掉的节点在apply之后,会整理掉transport中的peer,因而,消息就不会被发送到类似的节点中
// (2)Leader节点只需求告诉ApplyAll raft-log、hardState、snapshot悉数落盘
if !islead {
// finish processing incoming messages before we signal raftdone chan
msgs := r.processMessages(rd.Messages)
// now unblocks 'applyAll' that waits on Raft log disk writes before triggering snapshots
notifyc <- struct{}{}
// Candidate or follower needs to wait for all pending configuration
// changes to be applied before sending messages.
// Otherwise we might incorrectly count votes (e.g. votes from removed members).
// Also slow machine's follower raft-layer could proceed to become the leader
// on its own single-node cluster, before apply-layer applies the config change.
// We simply wait for ALL pending entries to be applied for now.
// We might improve this later on if it causes unnecessary long blocking issues.
waitApply := false
for _, ent := range rd.CommittedEntries {
if ent.Type == raftpb.EntryConfChange {
waitApply = true
break
}
}
if waitApply {
// blocks until 'applyAll' calls 'applyWait.Trigger'
// to be in sync with scheduled config-change job
// (assume notifyc has cap of 1)
select {
case notifyc <- struct{}{}:
case <-r.stopped:
return
}
}
r.transport.Send(msgs)
} else {
// leader already processed 'MsgSnap' and signaled
notifyc <- struct{}{}
}
// 告诉raft状态机:Ready目标现已处理完结
r.Advance()
case <-r.stopped:
return
}
}
}()
}
Apply日志Entry
对Ready进行处理协程首要是与etcdserver的ApplyAll流程交互,并经过notify-channel来进行进展协同,下面剖析下ApplyAll流程,逻辑解说悉数加到代码的注释里边,具体如下:
func (s *EtcdServer) applyAll(ep *etcdProgress, apply *apply) {
// 假如存在snapshot,将snapshot存储到store中,对应图【图解apply流程】7.1
s.applySnapshot(ep, apply)
// 假如存在Entries,将Entries存储到store中,对应图【图解apply流程】7.2
s.applyEntries(ep, apply)
proposalsApplied.Set(float64(ep.appliedi))
s.applyWait.Trigger(ep.appliedi)
// 等候ready处理流程将snapshot、HardState、raft-log耐久化到磁盘
// wait for the raft routine to finish the disk writes before triggering a
// snapshot. or applied index might be greater than the last index in raft
// storage, since the raft routine might be slower than apply routine.
<-apply.notifyc
// 因为有新的日志被Apply,因而需求判别下是否满意了从头做一次snapshot的条件
// 假如满意snapshot的创立条件,就新建一个snapshot
s.triggerSnapshot(ep)
// other code ...
}
func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) {
if raft.IsEmptySnap(apply.snapshot) {
return
}
// 判别被刺apply的snapshot是否合法
if apply.snapshot.Metadata.Index <= ep.appliedi {
if lg != nil {
lg.Panic(
"unexpected leader snapshot from outdated index",
zap.Uint64("current-snapshot-index", ep.snapi),
zap.Uint64("current-applied-index", ep.appliedi),
zap.Uint64("incoming-leader-snapshot-index", apply.snapshot.Metadata.Index),
zap.Uint64("incoming-leader-snapshot-term", apply.snapshot.Metadata.Term),
)
} else {
plog.Panicf("snapshot index [%d] should > appliedi[%d] + 1",
apply.snapshot.Metadata.Index, ep.appliedi)
}
}
// 等候snapshot在Ready处理协程中被耐久化到磁盘,因为耐久化snapshot首要是凭借于snapshotter
// 下面openSnapshotBackend也是凭借与snapshotter,所以在调用openSnapshotBackend之前有必要确保snapshot保存到了snapshotter
// wait for raftNode to persist snapshot onto the disk
<-apply.notifyc
newbe, err := openSnapshotBackend(s.Cfg, s.snapshotter, apply.snapshot)
if err != nil {
if lg != nil {
lg.Panic("failed to open snapshot backend", zap.Error(err))
} else {
plog.Panic(err)
}
}
// (1)从newbe(backend.Backend)中康复lessor相关信息
// (2)从newbe(backend.Backend)康复当前store的索引及数据信息
if err := s.kv.Restore(newbe); err != nil {
if lg != nil {
lg.Panic("failed to restore mvcc store", zap.Error(err))
} else {
plog.Panicf("restore KV error: %v", err)
}
}
// 设置ConsistentWatchableKV最后一条日志Entry的index
s.consistIndex.setConsistentIndex(s.kv.ConsistentIndex())
if lg != nil {
lg.Info("restored mvcc store")
} else {
plog.Info("finished restoring mvcc store")
}
// 用新建的backend康复etcd环境运转的其他信息,如:store鉴权、集群节点信息、重建transport等
ep.appliedt = apply.snapshot.Metadata.Term
ep.appliedi = apply.snapshot.Metadata.Index
ep.snapi = ep.appliedi
ep.confState = apply.snapshot.Metadata.ConfState
}
func (s *EtcdServer) applyEntries(ep *etcdProgress, apply *apply) {
if len(apply.entries) == 0 {
return
}
// 查看本次要Apply的Entry是否合法
firsti := apply.entries[0].Index
if firsti > ep.appliedi+1 {
if lg := s.getLogger(); lg != nil {
lg.Panic(
"unexpected committed entry index",
zap.Uint64("current-applied-index", ep.appliedi),
zap.Uint64("first-committed-entry-index", firsti),
)
} else {
plog.Panicf("first index of committed entry[%d] should <= appliedi[%d] + 1", firsti, ep.appliedi)
}
}
// 从即将apply的日志中除掉现已apply过的
var ents []raftpb.Entry
if ep.appliedi+1-firsti < uint64(len(apply.entries)) {
ents = apply.entries[ep.appliedi+1-firsti:]
}
if len(ents) == 0 {
return
}
// 调用apply函数履行真实的apply日志Entry的操作
var shouldstop bool
if ep.appliedt, ep.appliedi, shouldstop = s.apply(ents, &ep.confState); shouldstop {
go s.stopWithDelay(10*100*time.Millisecond, fmt.Errorf("the member has been permanently removed from the cluster"))
}
}
// 遍历一切的Entry,依据Entry类型履行Apply操作
func (s *EtcdServer) apply(
es []raftpb.Entry,
confState *raftpb.ConfState,
) (appliedt uint64, appliedi uint64, shouldStop bool) {
for i := range es {
e := es[i]
switch e.Type {
case raftpb.EntryNormal:
s.applyEntryNormal(&e)
s.setAppliedIndex(e.Index)
s.setTerm(e.Term)
case raftpb.EntryConfChange:
// set the consistent index of current executing entry
if e.Index > s.consistIndex.ConsistentIndex() {
s.consistIndex.setConsistentIndex(e.Index)
}
var cc raftpb.ConfChange
pbutil.MustUnmarshal(&cc, e.Data)
removedSelf, err := s.applyConfChange(cc, confState)
s.setAppliedIndex(e.Index)
s.setTerm(e.Term)
shouldStop = shouldStop || removedSelf
s.w.Trigger(cc.ID, &confChangeResponse{s.cluster.Members(), err})
default:
if lg := s.getLogger(); lg != nil {
lg.Panic(
"unknown entry type; must be either EntryNormal or EntryConfChange",
zap.String("type", e.Type.String()),
)
} else {
plog.Panicf("entry type should be either EntryNormal or EntryConfChange")
}
}
appliedi, appliedt = e.Index, e.Term
}
return appliedt, appliedi, shouldStop
}
履行真实的apply 操作时,不同的Entry类型处理方式是不一样的,这儿Entry类型首要有两种:读写业务Entry、_raft装备改变Entry,_本节先不介绍raft装备改变,下面首要介绍一般日志Entry的Apply流程:
Apply读写业务Entry
一般的业务Entry首要是key-value的读写操作,因而首要是和mvcc存储/key-value存储组件打交互,要害代码的解说都在注释中:
func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry) {
// etcd v3中首要是设计两个版别的存储组件:
// (1)v2版别的存储组件,仅仅简单的key-value存储,日志Entry重复Apply不影响store一致性
// (2)v3版别的存储组件,支持业务的mvcc存储组件,日志Entry重复Apply影响store一致性
// 在介绍mvcc模块时,构建mvcc目标是会传入一个ConsistentIndexGetter
// etcdserver在初始化mvcc组件时会将s.consistIndex传入到mvcc目标的结构参数中
// 这样s.consistIndex.setConsistentIndex(e.Index)能够设置mvcc业务End函数里边获取当前业务关联的Entry Index
shouldApplyV3 := false
if e.Index > s.consistIndex.ConsistentIndex() {
// set the consistent index of current executing entry
s.consistIndex.setConsistentIndex(e.Index)
shouldApplyV3 = true
}
// other code ...
// 将数据存储到v2的存储中
var raftReq pb.InternalRaftRequest
if !pbutil.MaybeUnmarshal(&raftReq, e.Data) { // backward compatible
var r pb.Request
rp := &r
pbutil.MustUnmarshal(rp, e.Data)
s.w.Trigger(r.ID, s.applyV2Request((*RequestV2)(rp)))
return
}
if raftReq.V2 != nil {
req := (*RequestV2)(raftReq.V2)
s.w.Trigger(req.ID, s.applyV2Request(req))
return
}
// 假如日志Entry没有Apllied到V3版别的存储的,运用Applyer v3履行Entry对应的业务
// Entry对应的业务结束后,在End函数中经过ConsistentIndexGetter获取前面(s.consistIndex.setConsistentIndex(e.Index))设置的当前日志Index
// 将日志Index和业务改变一同存储到Store中,并在合适的机遇耐久化到boltdb
if !shouldApplyV3 {
return
}
id := raftReq.ID
if id == 0 {
id = raftReq.Header.ID
}
var ar *applyResult
needResult := s.w.IsRegistered(id)
if needResult || !noSideEffect(&raftReq) {
if !needResult && raftReq.Txn != nil {
removeNeedlessRangeReqs(raftReq.Txn)
}
ar = s.applyV3.Apply(&raftReq)
}
// other code ...
}
重启数据康复
上面几大段首要是描绘了,咱们暂时疏忽了raft装备改变日志Entry的Apply,因为这块后期会有专门的文章去叙说,这儿咱们首要重视下一般日志Entry的重启康复,不过在剖析etcd重启流程没必要过于重视时一般日志Entry仍是装备改变Entry,因为整个流程时相对比较通用的,你大能够以为etcd在前史运转进程中还没有产生过日志改变。
上面描绘咱们看到数据耐久化的当地有三部分,依照耐久化的顺序分别是:sanpshot、WAL,业务数据耐久化:
snapshot:耐久化首要是耐久化了完好的一份snapshot数据。
WAL:耐久化了日志entry、raft HardState、snapshot元数据等等。
业务数据耐久化:参阅前面mvcc组件的put/range/del等操作,耐久化的用户业务数据及业务日志Entry等。
etcdserver重启的场景也比较多,这儿首要是重视在有WAL正常重启的进程,该进程首要流程在如下文件中:
https://github.com/etcd-io/etcd/blob/v3.4.9/etcdserver/server.go#L410
首要是判别是否存在一份可用的snapshot,假如存在snapshot就创立一个snapshot目标:
// 首要查看WAL文件中,WAL Entry的可用性,从中取出snapshot元数据
walSnaps, serr := wal.ValidSnapshotEntries(cfg.Logger, cfg.WALDir())
if serr != nil {
return nil, serr
}
// 经过snapshot元数据,找到最新的可用的snapshot数据,构建sanpshot目标
snapshot, err = ss.LoadNewestAvailable(walSnaps)
if err != nil && err != snap.ErrNoSnapshot {
return nil, err
}
// 因为WAL是线性写入的,后写入的Entry最新,因而从snap entry中找到最后匹配元数据的snapshot,即为NewestAvailable目标
func (s *Snapshotter) LoadNewestAvailable(walSnaps []walpb.Snapshot) (*raftpb.Snapshot, error) {
return s.loadMatching(func(snapshot *raftpb.Snapshot) bool {
m := snapshot.Metadata
for i := len(walSnaps) - 1; i >= 0; i-- {
if m.Term == walSnaps[i].Term && m.Index == walSnaps[i].Index {
return true
}
}
return false
})
}
有了snapshot目标,便能够运用其去构建存储目标:
// 运用snapshot康复v2版别的k-v存储和v3版别的backend目标
if snapshot != nil {
if err = st.Recovery(snapshot.Data); err != nil {
if cfg.Logger != nil {
cfg.Logger.Panic("failed to recover from snapshot")
} else {
plog.Panicf("recovered store from snapshot error: %v", err)
}
}
// other code ...
if be, err = recoverSnapshotBackend(cfg, be, *snapshot); err != nil {
if cfg.Logger != nil {
cfg.Logger.Panic("failed to recover v3 backend from snapshot", zap.Error(err))
} else {
plog.Panicf("recovering backend from snapshot error: %v", err)
}
}
// other code ...
}
结构完snapshot进入启动raft node节点流程的具体解说在如下代码逻辑中:
func restartNode(cfg ServerConfig, snapshot *raftpb.Snapshot) (types.ID, *membership.RaftCluster, raft.Node, *raft.MemoryStorage, *wal.WAL) {
// 取出snapshot里边的元数据数据
var walsnap walpb.Snapshot
if snapshot != nil {
walsnap.Index, walsnap.Term = snapshot.Metadata.Index, snapshot.Metadata.Term
}
// 凭借于walsnap元数据找到合适的WAL文件,读取WAL文件,回来节点集群信息、HardState、日志Entry等等
w, id, cid, st, ents := readWAL(cfg.Logger, cfg.WALDir(), walsnap)
// 因为首要是以raft节点正常重启流程作为叙说的,不考虑raft重启后集群被新建的问题
// 设置raft目标地点集群ID
cl := membership.NewCluster(cfg.Logger, "")
cl.SetID(id, cid)
// 将snapshot存储到storage中
s := raft.NewMemoryStorage()
if snapshot != nil {
s.ApplySnapshot(*snapshot)
}
// 将HardState/日志Enties存储到Storage中
s.SetHardState(st)
s.Append(ents)
// 构建raft状态机目标,此刻raft目标能够看到如下内容:
// (1) 日志Entries
// (2) 那些日志被Committed了
// 注意:截止到现在etcd重启前的数据基本被加载结束了,能够剖析下etcd突然宕机或许呈现的两种数据不一致的场景
// (1)Entry被重复Applied:截止到现在,raft并不知道那些日志被Applied了,尽管raft.Config里边有Applied uint64字段能够告诉raft现已applied的日志index
// 可是raft.Config.Applied字段此刻为0,因而raft杂构建Ready目标时会把介于raftlog中满意(max(applied, firstIndex), committed]Entry悉数回来,
// 因而,假如Storage的日志Entries存在了现已Applied的日志Entry,或许会被从头打包到Ready目标中
// 可是在Apply的时分,该日志并不会被真实Apply,而是会被过滤掉,因而Store供给接口能够获取到最后一次Applied的日志Entry index
// (2)被Applid的Entry为耐久化时突然宕机:只需合理的整理WAL日志,确保WAL日志Entry和现已被Apply的有交集或连续,就不会导致etcd在宕机时丢掉业务Entry,也不会呈现日志Entry被重复Apply的状况产生.
// WAL中整理wal文件实在创立snapshot之后整理的,而且创立snapshot的日志Entry都是被apply的,而且WAL整理日志Entry的进程也比较保存,会多保存一个WAL文件,
// 即使现已Apply的日志在boltdb tx batch buffer中还未耐久化,在创立snapshot之前etcd宕机,但这些日志WAL也现已耐久化了,而且记录是现已committed了,
// 这些applied的日志,未落盘时,会被etcd raft状态机从头提交,底子不会呈现丢掉日志Entry的状况
c := &raft.Config{
ID: uint64(id),
ElectionTick: cfg.ElectionTicks,
HeartbeatTick: 1,
Storage: s,
MaxSizePerMsg: maxSizePerMsg,
MaxInflightMsgs: maxInflightMsgs,
CheckQuorum: true,
PreVote: cfg.PreVote,
}
// other code ...
// 启动raft目标
n := raft.RestartNode(c)
raftStatusMu.Lock()
raftStatus = n.Status
raftStatusMu.Unlock()
return id, cl, n, s, w
}
截止到现在etcd重启前的数据基本被加载结束了,能够开端剖析下etcd宕机时,两种数据不一致的场景:
(1)Entry被重复Applied:构建raft目标时,raft.Config里边有Applied字段没有被使用起来,明显raft目标并不知道那些日志被Applied了,因而raft杂构建Ready目标时会把介于raftlog中满意(max(applied, firstIndex), committed]Entry悉数回来用于本次applied,假如Storage的日志Entries存在了现已Applied的日志Entry,或许会被从头打包到Ready目标中,但在Apply的时分,该日志并不会被真实Apply,而是会被过滤掉,因而Store供给接口能够获取到最后一次Applied的日志Entry index。
(2)被Applid的Entry未耐久化时突然宕机:只需合理的整理WAL日志,确保WAL日志Entry与现已Applied的Entry有交集或连续,就不会导致etcd在宕机时丢掉业务Entry,也不会呈现日志Entry被重复Apply的状况产生。etcd的snapshot及WAL机制,确实会确保这一点,WAL在整理比较旧的wal文件是在创立snapshot之后整理的,而且只会整理打包到snapshot里边的日志Entry,创立snapshot的日志Entry都是被apply的,而且WAL整理日志Entry的也比较保存,会多保存一个WAL文件,即使现已Apply的日志在boltdb tx batch buffer中,还未耐久化,在创立snapshot之前etcd宕机,这些日志在WAL也现已耐久化了,且committed也耐久化了,etcd重启后会被etcd raft状态机从头提交,底子不会呈现丢掉日志Entry的状况。