值得信赖的区块链资讯!
比推数据  |  比推终端  |  比推英文  |  比推 APP  | 

下载比推 APP

值得信赖的区块链资讯!
iPhone
Android

Filecoin – Sector状态管理逻辑

Star Li

好久不看go语言的代码了,最近有空换换脑子,看看Sector的状态管理,纯go语言实现。看看大型项目的代码设计,对代码以及项目的设计开发有莫大的好处。Lotus的go语言部分的最新的代码,采用模块化设计。Lotus的代码也是一步步演进的,第一版的代码也是所有的逻辑耦合在一起,后面才逐步的模块化。

Lotus Miner将用户需要存储的数据“打包”成一个个Sector,并对Sector进行处理。本文就讲讲Sector的状态管理。

01

模块框架

Sector状态管理相关的模块如下:

star go 1.JPG

Sector的状态管理基于状态机。Lotus实现了通用的状态机(statemachine),在go-statemachine包中。在StateMachine之上,进一步抽象出了StateGroup,管理多个状态机。StateMachine只是实现状态的转变,具体状态是通过go-statestore进行存储。在StateMachine之上,定义状态转变的规则以及状态对应的处理函数,这些就是在具体的业务。SectorState就是Lotus管理Sector的具体业务。在这些底层模块的基础上,Lotus的相关代码调用就比较简单。先从状态管理的底层模块,StateMachine讲起:

02

StateMachine

StateMachine定义在go-statemachine包中,machine.go:

 type StateMachine struct {
     planner  Planner
     eventsIn chan Event
     name      interface{}
     st        *statestore.StoredState
     stateType reflect.Type
     stageDone chan struct{}
     closing   chan struct{}
     closed    chan struct{}
     busy int32
 }

其中,planner是抽象出来的状态机的状态转化函数。Planner接收Event,结合当前的状态user,确定下一步的处理。

type Planner func(events []Event, user interface{}) (interface{}, uint64, error)

st是存储的状态。stateType是存储状态的类型。

StateMachine的核心是run函数,分成三部分:接收Event,状态处理,下一步的调用。其中状态处理是关键:

err := fsm.mutateUser(func(user interface{}) (err error) {
    nextStep, processed, err = fsm.planner(pendingEvents, user)
    ustate = user
    if xerrors.Is(err, ErrTerminated) {
        terminated = true
        return nil
    }
    return err
})

mutateUser就是查看当前存储的状态(StoredState),并执行planner函数,并将planner处理后的状态存储。planner函数返回下一步的处理,已经处理的Event 的个数,有可能的出错。run函数会启动另外一个go routine执行nextStep。

star go 2.JPG

mutateUser具体的实现是利用go的反射(reflect),实现抽象和模块化。相关的逻辑感兴趣的小伙伴可以自己查看。

03

StateGroup

往往业务需要很多StateMachine。举个例子,Lotus的Miner存储多个Sector,每个Sector都由一个StateMachine维护独立的状态。StateGroup就是实现多个“同样”的StateMachine,定义在group.go中。

type StateGroup struct {
     sts       *statestore.StateStore
     hnd       StateHandler
     stateType reflect.Type
     closing      chan struct{}
     initNotifier sync.Once
     lk  sync.Mutex
     sms map[datastore.Key]*StateMachine
 }

其中,sms就是StateMachine的状态数组。StateHandler是StateMachine的状态处理函数接口:

 type StateHandler interface {
     Plan(events []Event, user interface{}) (interface{}, uint64, error)
 }

也就是说,在StateMachine状态机上,需要实现StateHandler接口(Plan函数,实现状态的转换)。

04

StoredState

StoredState是抽象的Key-Value的存储。相对比较简单,Get/Put接口,非常容易理解。

05

SectorState

storage-fsm实现了和Sector状态相关的业务逻辑。也就是状态的定义,状态的转换函数都是在这个包里实现。整个Sector的信息定义在storage-fsm/types.go中:

 type SectorInfo struct {
     State        SectorState
     SectorNumber abi.SectorNumber
     Nonce        uint64
     SectorType abi.RegisteredProof
     // Packing
     Pieces []Piece
     // PreCommit1
     TicketValue   abi.SealRandomness
     TicketEpoch   abi.ChainEpoch
     PreCommit1Out storage.PreCommit1Out
     // PreCommit2
     CommD *cid.Cid
     CommR *cid.Cid
     Proof []byte
     PreCommitMessage *cid.Cid
     // WaitSeed
     SeedValue abi.InteractiveSealRandomness
     SeedEpoch abi.ChainEpoch
     // Committing
     CommitMessage *cid.Cid
     InvalidProofs uint64
     // Faults
     FaultReportMsg *cid.Cid
     // Debug
     LastErr string
     Log []Log
 }

SectorInfo包括了Sector状态,Precommit1/2的数据,Committing的数据等等。其中,SectorState描述了Sector的具体状态。Sector的所有的状态定义在sector_state.go文件中:

const (
     UndefinedSectorState SectorState = ""
     // happy path
     Empty          SectorState = "Empty"
     Packing        SectorState = "Packing"       // sector not in sealStore, and not on chain
     PreCommit1     SectorState = "PreCommit1"    // do PreCommit1
     PreCommit2     SectorState = "PreCommit2"    // do PreCommit1
     PreCommitting  SectorState = "PreCommitting" // on chain pre-commit
     WaitSeed       SectorState = "WaitSeed"      // waiting for seed
     Committing     SectorState = "Committing"
     CommitWait     SectorState = "CommitWait" // waiting for message to land on chain
     FinalizeSector SectorState = "FinalizeSector"
     Proving        SectorState = "Proving"
     // error modes
     FailedUnrecoverable SectorState = "FailedUnrecoverable"
     SealFailed          SectorState = "SealFailed"
     PreCommitFailed     SectorState = "PreCommitFailed"
     ComputeProofFailed  SectorState = "ComputeProofFailed"
     CommitFailed        SectorState = "CommitFailed"
     PackingFailed       SectorState = "PackingFailed"
     Faulty              SectorState = "Faulty"        // sector is corrupted or gone for some reason
     FaultReported       SectorState = "FaultReported" // sector has been declared as a fault on chain
     FaultedFinal        SectorState = "FaultedFinal"  // fault declared on chain
 )

熟悉SDR算法的小伙伴,会发现很多熟悉的字眼:PreCommit1,PreCommit2,Commiting等等。结合状态处理函数,就能很清楚各个状态的含义以及需要处理的内容。

fsm.go中Sealing结构的Plan函数是Sector状态的处理函数:

 func (m *Sealing) Plan(events []statemachine.Event, user interface{}) (interface{}, uint64, error) {
     next, err := m.plan(events, user.(*SectorInfo))
     if err != nil || next == nil {
         return nil, uint64(len(events)), err
     }
     return func(ctx statemachine.Context, si SectorInfo) error {
         err := next(ctx, si)
         if err != nil {
             log.Errorf("unhandled sector error (%d): %+v", si.SectorNumber, err)
             return nil
         }
         return nil
     }, uint64(len(events)), nil // TODO: This processed event count is not very correct
 }

Plan函数的实现也比较简单,调用plan处理当前的状态,并返回接下来需要处理的函数。状态的处理又可以分成两部分来看:1/ 状态转换的定义 2/ 状态的处理。

针对Sector的状态转换的定义,在fsmPlanners中:

 var fsmPlanners = map[SectorState]func(events []statemachine.Event, state *SectorInfo) error{
     UndefinedSectorState: planOne(on(SectorStart{}, Packing)),
     Packing:              planOne(on(SectorPacked{}, PreCommit1)),
     PreCommit1: planOne(
         on(SectorPreCommit1{}, PreCommit2),
         on(SectorSealPreCommitFailed{}, SealFailed),
         on(SectorPackingFailed{}, PackingFailed),
     ),
     PreCommit2: planOne(
         on(SectorPreCommit2{}, PreCommitting),
         on(SectorSealPreCommitFailed{}, SealFailed),
         on(SectorPackingFailed{}, PackingFailed),
     ),
...

比如说,在PreCommit1的状态,接收到SectorPreCommit1事件,将进入PreCommit2状态。所有Sector的状态转换如下图:

star go 3.JPG

在处理好状态好,就进入相应状态的处理程序。以PreCommit1状态为例,相应的处理函数是handlePreCommit1。

func (m *Sealing) handlePreCommit1(ctx statemachine.Context, sector SectorInfo) error {
     tok, epoch, err := m.api.ChainHead(ctx.Context())
     ...
     pc1o, err := m.sealer.SealPreCommit1(ctx.Context(), m.minerSector(sector.SectorNumber), ticketValue, sector.pieceInfos())
     if err != nil {
         return ctx.Send(SectorSealPreCommitFailed{xerrors.Errorf("seal pre commit(1) failed: %w", err)})
     }
   ...
 }

很清楚看出,在PreCommit1的处理函数中,会通过SealPrecommit1调用rust-fil-proofs实现Precommit1的计算。最后总结一下,各个状态的含义:

Empty – 空状态

Packing – 打包状态,多个Piece填充到一个Sector中

PreCommit1 – PreCommit1计算

PreCommit2 – PreCommit2计算

PreCommitting – 提交Precommit2的结果到链上

WaitSeed – 等待随机种子(给定10个区块的时间,让随机数种子不可提前预测)

Committing – 计算Commit1/Commit2,并将证明提交到链上

CommitWait – 等待链上确认 FinalizeSector – Sector状态确定

总结:

Sector的状态管理基于状态机。通用状态机的实现是通过go-statemachine实现。状态的存储通过go-statestore实现。在这些模块的基础上,storage-fsm实现了Sector的状态定义以及状态处理函数。

来源:Star Li

说明:比推所有文章只代表作者观点,不构成投资建议
原文链接:https://www.bitpush.news/articles/833906

比推快讯

更多 >>

下载比推 APP

24 小时追踪区块链行业资讯、热点头条、事实报道、深度洞察。

邮件订阅

金融科技决策者们都在看的区块链简报与深度分析,「比推」帮你划重点。