Skip to content

Commit

Permalink
storagefsm: Fix batch deal packing behavior
Browse files Browse the repository at this point in the history
  • Loading branch information
magik6k committed May 30, 2021
1 parent 5445b05 commit b4cc48c
Show file tree
Hide file tree
Showing 5 changed files with 107 additions and 78 deletions.
135 changes: 73 additions & 62 deletions api/test/deals.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func TestDoubleDealFlow(t *testing.T, b APIBuilder, blocktime time.Duration, sta
}

func MakeDeal(t *testing.T, ctx context.Context, rseed int, client api.FullNode, miner TestStorageNode, carExport, fastRet bool, startEpoch abi.ChainEpoch) {
res, data, err := CreateClientFile(ctx, client, rseed)
res, data, err := CreateClientFile(ctx, client, rseed, 0)
if err != nil {
t.Fatal(err)
}
Expand All @@ -72,8 +72,11 @@ func MakeDeal(t *testing.T, ctx context.Context, rseed int, client api.FullNode,
testRetrieval(t, ctx, client, fcid, &info.PieceCID, carExport, data)
}

func CreateClientFile(ctx context.Context, client api.FullNode, rseed int) (*api.ImportRes, []byte, error) {
data := make([]byte, 1600)
func CreateClientFile(ctx context.Context, client api.FullNode, rseed, size int) (*api.ImportRes, []byte, error) {
if size == 0 {
size = 1600
}
data := make([]byte, size)
rand.New(rand.NewSource(int64(rseed))).Read(data)

dir, err := ioutil.TempDir(os.TempDir(), "test-make-deal-")
Expand Down Expand Up @@ -119,7 +122,7 @@ func TestPublishDealsBatching(t *testing.T, b APIBuilder, blocktime time.Duratio

// Starts a deal and waits until it's published
runDealTillPublish := func(rseed int) {
res, _, err := CreateClientFile(s.ctx, s.client, rseed)
res, _, err := CreateClientFile(s.ctx, s.client, rseed, 0)
require.NoError(t, err)

upds, err := client.ClientGetDealUpdates(s.ctx)
Expand Down Expand Up @@ -186,68 +189,76 @@ func TestPublishDealsBatching(t *testing.T, b APIBuilder, blocktime time.Duratio
}

func TestBatchDealInput(t *testing.T, b APIBuilder, blocktime time.Duration, startEpoch abi.ChainEpoch) {
publishPeriod := 10 * time.Second
maxDealsPerMsg := uint64(4)

// Set max deals per publish deals message to maxDealsPerMsg
minerDef := []StorageMiner{{
Full: 0,
Opts: node.Options(
node.Override(
new(*storageadapter.DealPublisher),
storageadapter.NewDealPublisher(nil, storageadapter.PublishMsgConfig{
Period: publishPeriod,
MaxDealsPerMsg: maxDealsPerMsg,
})),
node.Override(new(dtypes.GetSealingConfigFunc), func() (dtypes.GetSealingConfigFunc, error) {
return func() (sealiface.Config, error) {
return sealiface.Config{
MaxWaitDealsSectors: 1,
MaxSealingSectors: 1,
MaxSealingSectorsForDeals: 2,
AlwaysKeepUnsealedCopy: true,
}, nil
}, nil
}),
),
Preseal: PresealGenesis,
}}

// Create a connect client and miner node
n, sn := b(t, OneFull, minerDef)
client := n[0].FullNode.(*impl.FullNodeAPI)
miner := sn[0]
s := connectAndStartMining(t, b, blocktime, client, miner)
defer s.blockMiner.Stop()

// Starts a deal and waits until it's published
runDealTillSeal := func(rseed int) {
res, _, err := CreateClientFile(s.ctx, s.client, rseed)
require.NoError(t, err)
run := func(piece, deals, expectSectors int) func(t *testing.T) {
return func(t *testing.T) {
publishPeriod := 10 * time.Second
maxDealsPerMsg := uint64(deals)

// Set max deals per publish deals message to maxDealsPerMsg
minerDef := []StorageMiner{{
Full: 0,
Opts: node.Options(
node.Override(
new(*storageadapter.DealPublisher),
storageadapter.NewDealPublisher(nil, storageadapter.PublishMsgConfig{
Period: publishPeriod,
MaxDealsPerMsg: maxDealsPerMsg,
})),
node.Override(new(dtypes.GetSealingConfigFunc), func() (dtypes.GetSealingConfigFunc, error) {
return func() (sealiface.Config, error) {
return sealiface.Config{
MaxWaitDealsSectors: 1,
MaxSealingSectors: 1,
MaxSealingSectorsForDeals: 2,
AlwaysKeepUnsealedCopy: true,
}, nil
}, nil
}),
),
Preseal: PresealGenesis,
}}

// Create a connect client and miner node
n, sn := b(t, OneFull, minerDef)
client := n[0].FullNode.(*impl.FullNodeAPI)
miner := sn[0]
s := connectAndStartMining(t, b, blocktime, client, miner)
defer s.blockMiner.Stop()

// Starts a deal and waits until it's published
runDealTillSeal := func(rseed int) {
res, _, err := CreateClientFile(s.ctx, s.client, rseed, piece)
require.NoError(t, err)

dc := startDeal(t, s.ctx, s.miner, s.client, res.Root, false, startEpoch)
waitDealSealed(t, s.ctx, s.miner, s.client, dc, false)
}

dc := startDeal(t, s.ctx, s.miner, s.client, res.Root, false, startEpoch)
waitDealSealed(t, s.ctx, s.miner, s.client, dc, false)
}
// Run maxDealsPerMsg+1 deals in parallel
done := make(chan struct{}, maxDealsPerMsg+1)
for rseed := 1; rseed <= int(maxDealsPerMsg+1); rseed++ {
rseed := rseed
go func() {
runDealTillSeal(rseed)
done <- struct{}{}
}()
}

// Run maxDealsPerMsg+1 deals in parallel
done := make(chan struct{}, maxDealsPerMsg+1)
for rseed := 1; rseed <= int(maxDealsPerMsg+1); rseed++ {
rseed := rseed
go func() {
runDealTillSeal(rseed)
done <- struct{}{}
}()
}
// Wait for maxDealsPerMsg of the deals to be published
for i := 0; i < int(maxDealsPerMsg); i++ {
<-done
}

// Wait for maxDealsPerMsg of the deals to be published
for i := 0; i < int(maxDealsPerMsg); i++ {
<-done
sl, err := sn[0].SectorsList(s.ctx)
require.NoError(t, err)
require.GreaterOrEqual(t, len(sl), expectSectors)
require.LessOrEqual(t, len(sl), expectSectors+1)
}
}

sl, err := sn[0].SectorsList(s.ctx)
require.NoError(t, err)
require.GreaterOrEqual(t, len(sl), 4)
require.LessOrEqual(t, len(sl), 5)
t.Run("4-p1600B", run(1600, 4, 4))
t.Run("4-p513B", run(513, 4, 2))
t.Run("32-p257B", run(257, 32, 8))
}

func TestFastRetrievalDealFlow(t *testing.T, b APIBuilder, blocktime time.Duration, startEpoch abi.ChainEpoch) {
Expand Down Expand Up @@ -430,7 +441,7 @@ func startSealingWaiting(t *testing.T, ctx context.Context, miner TestStorageNod
si, err := miner.SectorsStatus(ctx, snum, false)
require.NoError(t, err)

t.Logf("Sector state: %s", si.State)
t.Logf("Sector %d state: %s", snum, si.State)
if si.State == api.SectorState(sealing.WaitDeals) {
require.NoError(t, miner.SectorStartSealing(ctx, snum))
}
Expand Down
4 changes: 2 additions & 2 deletions cli/test/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func RunClientTest(t *testing.T, cmds []*lcli.Command, clientNode test.TestNode)

// Create a deal (non-interactive)
// client deal --start-epoch=<start epoch> <cid> <miner addr> 1000000attofil <duration>
res, _, err := test.CreateClientFile(ctx, clientNode, 1)
res, _, err := test.CreateClientFile(ctx, clientNode, 1, 0)
require.NoError(t, err)
startEpoch := fmt.Sprintf("--start-epoch=%d", 2<<12)
dataCid := res.Root
Expand All @@ -60,7 +60,7 @@ func RunClientTest(t *testing.T, cmds []*lcli.Command, clientNode test.TestNode)
// <miner addr>
// "no" (verified client)
// "yes" (confirm deal)
res, _, err = test.CreateClientFile(ctx, clientNode, 2)
res, _, err = test.CreateClientFile(ctx, clientNode, 2, 0)
require.NoError(t, err)
dataCid2 := res.Root
duration = fmt.Sprintf("%d", build.MinDealDuration/builtin.EpochsInDay)
Expand Down
4 changes: 4 additions & 0 deletions extern/storage-sealing/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ var fsmPlanners = map[SectorState]func(events []statemachine.Event, state *Secto
AddPiece: planOne(
on(SectorPieceAdded{}, WaitDeals),
apply(SectorStartPacking{}),
apply(SectorAddPiece{}),
on(SectorAddPieceFailed{}, AddPieceFailed),
),
Packing: planOne(on(SectorPacked{}, GetTicket)),
Expand Down Expand Up @@ -534,6 +535,7 @@ func onReturning(mut mutator) func() (mutator, func(*SectorInfo) (bool, error))

func planOne(ts ...func() (mut mutator, next func(*SectorInfo) (more bool, err error))) func(events []statemachine.Event, state *SectorInfo) (uint64, error) {
return func(events []statemachine.Event, state *SectorInfo) (uint64, error) {
eloop:
for i, event := range events {
if gm, ok := event.User.(globalMutator); ok {
gm.applyGlobal(state)
Expand All @@ -556,6 +558,8 @@ func planOne(ts ...func() (mut mutator, next func(*SectorInfo) (more bool, err e
if err != nil || !more {
return uint64(i + 1), err
}

continue eloop
}

_, ok := event.User.(Ignorable)
Expand Down
36 changes: 25 additions & 11 deletions extern/storage-sealing/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,14 @@ func (m *Sealing) handleWaitDeals(ctx statemachine.Context, sector SectorInfo) e

m.inputLk.Lock()

sid := m.minerSectorID(sector.SectorNumber)

if len(m.assignedPieces[sid]) > 0 {
m.inputLk.Unlock()
// got assigned more pieces in the AddPiece state
return ctx.Send(SectorAddPiece{})
}

started, err := m.maybeStartSealing(ctx, sector, used)
if err != nil || started {
delete(m.openSectors, m.minerSectorID(sector.SectorNumber))
Expand All @@ -36,16 +44,16 @@ func (m *Sealing) handleWaitDeals(ctx statemachine.Context, sector SectorInfo) e
return err
}

m.openSectors[m.minerSectorID(sector.SectorNumber)] = &openSector{
used: used,
maybeAccept: func(cid cid.Cid) error {
// todo check deal start deadline (configurable)
if _, has := m.openSectors[sid]; !has {
m.openSectors[sid] = &openSector{
used: used,
maybeAccept: func(cid cid.Cid) error {
// todo check deal start deadline (configurable)
m.assignedPieces[sid] = append(m.assignedPieces[sid], cid)

sid := m.minerSectorID(sector.SectorNumber)
m.assignedPieces[sid] = append(m.assignedPieces[sid], cid)

return ctx.Send(SectorAddPiece{})
},
return ctx.Send(SectorAddPiece{})
},
}
}

go func() {
Expand Down Expand Up @@ -350,20 +358,26 @@ func (m *Sealing) updateInput(ctx context.Context, sp abi.RegisteredSealProof) e
continue
}

avail := abi.PaddedPieceSize(ssize).Unpadded() - m.openSectors[mt.sector].used

if mt.size > avail {
continue
}

err := m.openSectors[mt.sector].maybeAccept(mt.deal)
if err != nil {
m.pendingPieces[mt.deal].accepted(mt.sector.Number, 0, err) // non-error case in handleAddPiece
}

m.openSectors[mt.sector].used += mt.padding + mt.size

m.pendingPieces[mt.deal].assigned = true
delete(toAssign, mt.deal)

if err != nil {
log.Errorf("sector %d rejected deal %s: %+v", mt.sector, mt.deal, err)
continue
}

delete(m.openSectors, mt.sector)
}

if len(toAssign) > 0 {
Expand Down
6 changes: 3 additions & 3 deletions node/impl/storminer.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,13 +244,13 @@ func (sm *StorageMinerAPI) SectorsList(context.Context) ([]abi.SectorNumber, err
return nil, err
}

out := make([]abi.SectorNumber, len(sectors))
for i, sector := range sectors {
out := make([]abi.SectorNumber, 0, len(sectors))
for _, sector := range sectors {
if sector.State == sealing.UndefinedSectorState {
continue // sector ID not set yet
}

out[i] = sector.SectorNumber
out = append(out, sector.SectorNumber)
}
return out, nil
}
Expand Down

0 comments on commit b4cc48c

Please sign in to comment.