Skip to content

Commit

Permalink
Unit tests for sector batchers
Browse files Browse the repository at this point in the history
  • Loading branch information
magik6k committed Jun 9, 2021
1 parent c98b6f6 commit 1fa68d4
Show file tree
Hide file tree
Showing 7 changed files with 766 additions and 12 deletions.
18 changes: 10 additions & 8 deletions extern/storage-sealing/commit_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ import (

const arp = abi.RegisteredAggregationProof_SnarkPackV1

//go:generate go run github.com/golang/mock/mockgen -destination=mocks/mock_commit_batcher.go -package=mocks . CommitBatcherApi

type CommitBatcherApi interface {
SendMsg(ctx context.Context, from, to address.Address, method abi.MethodNum, value, maxFee abi.TokenAmount, params []byte) (cid.Cid, error)
StateMinerInfo(context.Context, address.Address, TipSetToken) (miner.MinerInfo, error)
Expand All @@ -37,9 +39,9 @@ type CommitBatcherApi interface {
}

type AggregateInput struct {
spt abi.RegisteredSealProof
info proof5.AggregateSealVerifyInfo
proof []byte
Spt abi.RegisteredSealProof
Info proof5.AggregateSealVerifyInfo
Proof []byte
}

type CommitBatcher struct {
Expand Down Expand Up @@ -249,15 +251,15 @@ func (b *CommitBatcher) processBatch(cfg sealiface.Config) ([]sealiface.CommitBa

res.Sectors = append(res.Sectors, id)
params.SectorNumbers.Set(uint64(id))
infos = append(infos, p.info)
infos = append(infos, p.Info)
}

sort.Slice(infos, func(i, j int) bool {
return infos[i].Number < infos[j].Number
})

for _, info := range infos {
proofs = append(proofs, b.todo[info.Number].proof)
proofs = append(proofs, b.todo[info.Number].Proof)
}

mid, err := address.IDFromAddress(b.maddr)
Expand All @@ -267,7 +269,7 @@ func (b *CommitBatcher) processBatch(cfg sealiface.Config) ([]sealiface.CommitBa

params.AggregateProof, err = b.prover.AggregateSealProofs(proof5.AggregateSealVerifyProofAndInfos{
Miner: abi.ActorID(mid),
SealProof: b.todo[infos[0].Number].spt,
SealProof: b.todo[infos[0].Number].Spt,
AggregateProof: arp,
Infos: infos,
}, proofs)
Expand Down Expand Up @@ -340,7 +342,7 @@ func (b *CommitBatcher) processSingle(mi miner.MinerInfo, sn abi.SectorNumber, i
enc := new(bytes.Buffer)
params := &miner.ProveCommitSectorParams{
SectorNumber: sn,
Proof: info.proof,
Proof: info.Proof,
}

if err := params.MarshalCBOR(enc); err != nil {
Expand Down Expand Up @@ -426,7 +428,7 @@ func (b *CommitBatcher) Pending(ctx context.Context) ([]abi.SectorID, error) {
for _, s := range b.todo {
res = append(res, abi.SectorID{
Miner: abi.ActorID(mid),
Number: s.info.Number,
Number: s.Info.Number,
})
}

Expand Down
290 changes: 290 additions & 0 deletions extern/storage-sealing/commit_batch_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,290 @@
package sealing_test

import (
"bytes"
"context"
"sort"
"sync"
"testing"
"time"

"github.com/golang/mock/gomock"
"github.com/stretchr/testify/require"

"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-state-types/big"
miner5 "github.com/filecoin-project/specs-actors/v5/actors/builtin/miner"
proof5 "github.com/filecoin-project/specs-actors/v5/actors/runtime/proof"

"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/chain/actors/builtin/miner"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/extern/sector-storage/ffiwrapper"
sealing "github.com/filecoin-project/lotus/extern/storage-sealing"
"github.com/filecoin-project/lotus/extern/storage-sealing/mocks"
"github.com/filecoin-project/lotus/extern/storage-sealing/sealiface"
)

func TestCommitBatcher(t *testing.T) {
t0123, err := address.NewFromString("t0123")
require.NoError(t, err)

ctx := context.Background()

as := func(ctx context.Context, mi miner.MinerInfo, use api.AddrUse, goodFunds, minFunds abi.TokenAmount) (address.Address, abi.TokenAmount, error) {
return t0123, big.Zero(), nil
}

fc := sealing.FeeConfig{
MaxPreCommitGasFee: types.FromFil(1),
MaxCommitGasFee: types.FromFil(1),
MaxTerminateGasFee: types.FromFil(1),
}

maxBatch := miner5.MaxAggregatedSectors
minBatch := miner5.MinAggregatedSectors

cfg := func() (sealiface.Config, error) {
return sealiface.Config{
MaxWaitDealsSectors: 2,
MaxSealingSectors: 0,
MaxSealingSectorsForDeals: 0,
WaitDealsDelay: time.Hour * 6,
AlwaysKeepUnsealedCopy: true,

BatchPreCommits: true,
MinPreCommitBatch: 1,
MaxPreCommitBatch: miner5.PreCommitSectorBatchMaxSize,
PreCommitBatchWait: 24 * time.Hour,
PreCommitBatchSlack: 3 * time.Hour,

AggregateCommits: true,
MinCommitBatch: minBatch,
MaxCommitBatch: maxBatch,
CommitBatchWait: 24 * time.Hour,
CommitBatchSlack: 1 * time.Hour,

TerminateBatchMin: 1,
TerminateBatchMax: 100,
TerminateBatchWait: 5 * time.Minute,
}, nil
}

type promise func(t *testing.T)
type action func(t *testing.T, s *mocks.MockCommitBatcherApi, pcb *sealing.CommitBatcher) promise

actions := func(as ...action) action {
return func(t *testing.T, s *mocks.MockCommitBatcherApi, pcb *sealing.CommitBatcher) promise {
var ps []promise
for _, a := range as {
p := a(t, s, pcb)
if p != nil {
ps = append(ps, p)
}
}

if len(ps) > 0 {
return func(t *testing.T) {
for _, p := range ps {
p(t)
}
}
}
return nil
}
}

addSector := func(sn abi.SectorNumber) action {
return func(t *testing.T, s *mocks.MockCommitBatcherApi, pcb *sealing.CommitBatcher) promise {
var pcres sealiface.CommitBatchRes
var pcerr error
done := sync.Mutex{}
done.Lock()

si := sealing.SectorInfo{
SectorNumber: sn,
}

s.EXPECT().ChainHead(gomock.Any()).Return(nil, abi.ChainEpoch(1), nil)

go func() {
defer done.Unlock()
pcres, pcerr = pcb.AddCommit(ctx, si, sealing.AggregateInput{
Info: proof5.AggregateSealVerifyInfo{
Number: sn,
},
})
}()

return func(t *testing.T) {
done.Lock()
require.NoError(t, pcerr)
require.Empty(t, pcres.Error)
require.Contains(t, pcres.Sectors, si.SectorNumber)
}
}
}

addSectors := func(sectors []abi.SectorNumber) action {
as := make([]action, len(sectors))
for i, sector := range sectors {
as[i] = addSector(sector)
}
return actions(as...)
}

waitPending := func(n int) action {
return func(t *testing.T, s *mocks.MockCommitBatcherApi, pcb *sealing.CommitBatcher) promise {
require.Eventually(t, func() bool {
p, err := pcb.Pending(ctx)
require.NoError(t, err)
return len(p) == n
}, time.Second*5, 10*time.Millisecond)

return nil
}
}

expectSend := func(expect []abi.SectorNumber) action {
return func(t *testing.T, s *mocks.MockCommitBatcherApi, pcb *sealing.CommitBatcher) promise {
s.EXPECT().StateMinerInfo(gomock.Any(), gomock.Any(), gomock.Any()).Return(miner.MinerInfo{Owner: t0123, Worker: t0123}, nil)

ti := len(expect)
batch := false
if ti >= minBatch {
batch = true
ti = 1
}
s.EXPECT().ChainHead(gomock.Any()).Return(nil, abi.ChainEpoch(1), nil)
s.EXPECT().StateSectorPreCommitInfo(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(&miner.SectorPreCommitOnChainInfo{
PreCommitDeposit: big.Zero(),
}, nil).Times(len(expect))
s.EXPECT().StateMinerInitialPledgeCollateral(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(big.Zero(), nil).Times(len(expect))

s.EXPECT().SendMsg(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), funMatcher(func(i interface{}) bool {
b := i.([]byte)
if batch {
var params miner5.ProveCommitAggregateParams
require.NoError(t, params.UnmarshalCBOR(bytes.NewReader(b)))
for _, number := range expect {
set, err := params.SectorNumbers.IsSet(uint64(number))
require.NoError(t, err)
require.True(t, set)
}
} else {
var params miner5.ProveCommitSectorParams
require.NoError(t, params.UnmarshalCBOR(bytes.NewReader(b)))
}
return true
})).Times(ti)
return nil
}
}

flush := func(expect []abi.SectorNumber) action {
return func(t *testing.T, s *mocks.MockCommitBatcherApi, pcb *sealing.CommitBatcher) promise {
_ = expectSend(expect)(t, s, pcb)

batch := len(expect) >= minBatch

r, err := pcb.Flush(ctx)
require.NoError(t, err)
if batch {
require.Len(t, r, 1)
require.Empty(t, r[0].Error)
sort.Slice(r[0].Sectors, func(i, j int) bool {
return r[0].Sectors[i] < r[0].Sectors[j]
})
require.Equal(t, expect, r[0].Sectors)
} else {
require.Len(t, r, len(expect))
for _, res := range r {
require.Len(t, res.Sectors, 1)
require.Empty(t, res.Error)
}
sort.Slice(r, func(i, j int) bool {
return r[i].Sectors[0] < r[j].Sectors[0]
})
for i, res := range r {
require.Equal(t, abi.SectorNumber(i), res.Sectors[0])
}
}

return nil
}
}

getSectors := func(n int) []abi.SectorNumber {
out := make([]abi.SectorNumber, n)
for i := range out {
out[i] = abi.SectorNumber(i)
}
return out
}

tcs := map[string]struct {
actions []action
}{
"addSingle": {
actions: []action{
addSector(0),
waitPending(1),
flush([]abi.SectorNumber{0}),
},
},
"addTwo": {
actions: []action{
addSectors(getSectors(2)),
waitPending(2),
flush(getSectors(2)),
},
},
"addMax": {
actions: []action{
expectSend(getSectors(maxBatch)),
addSectors(getSectors(maxBatch)),
},
},
}

for name, tc := range tcs {
tc := tc

t.Run(name, func(t *testing.T) {
// create go mock controller here
mockCtrl := gomock.NewController(t)
// when test is done, assert expectations on all mock objects.
defer mockCtrl.Finish()

// create them mocks
pcapi := mocks.NewMockCommitBatcherApi(mockCtrl)

pcb := sealing.NewCommitBatcher(ctx, t0123, pcapi, as, fc, cfg, &fakeProver{})

var promises []promise

for _, a := range tc.actions {
p := a(t, pcapi, pcb)
if p != nil {
promises = append(promises, p)
}
}

for _, p := range promises {
p(t)
}

err := pcb.Stop(ctx)
require.NoError(t, err)
})
}
}

type fakeProver struct{}

func (f fakeProver) AggregateSealProofs(aggregateInfo proof5.AggregateSealVerifyProofAndInfos, proofs [][]byte) ([]byte, error) {
return []byte("Trust me, I'm a proof"), nil
}

var _ ffiwrapper.Prover = &fakeProver{}
Loading

0 comments on commit 1fa68d4

Please sign in to comment.