Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: sealing fsm: Handle inputLk correctly #8291

Merged
merged 1 commit into from
Mar 11, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 12 additions & 4 deletions extern/sector-storage/mock/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -426,11 +426,19 @@ func generateFakePoSt(sectorInfo []proof.SectorInfo, rpt func(abi.RegisteredSeal
}

func (mgr *SectorMgr) ReadPiece(ctx context.Context, sector storage.SectorRef, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, ticket abi.SealRandomness, unsealed cid.Cid) (mount.Reader, bool, error) {
if uint64(offset) != 0 {
panic("implme")
off := storiface.UnpaddedByteIndex(0)
var piece cid.Cid
for _, c := range mgr.sectors[sector.ID].pieces {
piece = c
if off >= offset {
break
}
off += storiface.UnpaddedByteIndex(len(mgr.pieces[piece]))
}

br := bytes.NewReader(mgr.pieces[mgr.sectors[sector.ID].pieces[0]][:size])
if off > offset {
panic("non-aligned offset todo")
}
br := bytes.NewReader(mgr.pieces[piece][:size])

return struct {
io.ReadCloser
Expand Down
24 changes: 11 additions & 13 deletions extern/storage-sealing/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,32 +315,29 @@ func (m *Sealing) SectorAddPieceToAny(ctx context.Context, size abi.UnpaddedPiec
m.inputLk.Unlock()

// we already have a pre-existing add piece call for this deal, let's wait for it to finish and see if it's successful
for {
res, err := waitAddPieceResp(ctx, pp)
if err != nil {
return api.SectorOffset{}, err
}
// there was an error waiting for a pre-existing add piece call, let's retry
if res.err != nil {
m.inputLk.Lock()
pp = m.addPendingPiece(ctx, size, data, deal, sp)
m.inputLk.Unlock()
continue
}
res, err := waitAddPieceResp(ctx, pp)
if err != nil {
return api.SectorOffset{}, err
}
if res.err == nil {
// all good, return the response
return api.SectorOffset{Sector: res.sn, Offset: res.offset.Padded()}, res.err
}
// if there was an error waiting for a pre-existing add piece call, let's retry
m.inputLk.Lock()
}

// addPendingPiece takes over m.inputLk
pp := m.addPendingPiece(ctx, size, data, deal, sp)
m.inputLk.Unlock()

res, err := waitAddPieceResp(ctx, pp)
if err != nil {
return api.SectorOffset{}, err
}
return api.SectorOffset{Sector: res.sn, Offset: res.offset.Padded()}, res.err
}

// called with m.inputLk; transfers the lock to another goroutine!
func (m *Sealing) addPendingPiece(ctx context.Context, size abi.UnpaddedPieceSize, data storage.Data, deal api.PieceDealInfo, sp abi.RegisteredSealProof) *pendingPiece {
doneCh := make(chan struct{})
pp := &pendingPiece{
Expand All @@ -357,6 +354,7 @@ func (m *Sealing) addPendingPiece(ctx context.Context, size abi.UnpaddedPieceSiz

m.pendingPieces[proposalCID(deal)] = pp
go func() {
defer m.inputLk.Unlock()
if err := m.updateInput(ctx, sp); err != nil {
log.Errorf("%+v", err)
}
Expand Down
27 changes: 16 additions & 11 deletions itests/batch_deal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,18 @@ import (

"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-state-types/big"
"github.com/stretchr/testify/require"

"github.com/filecoin-project/lotus/extern/storage-sealing/sealiface"
"github.com/filecoin-project/lotus/itests/kit"
"github.com/filecoin-project/lotus/markets/storageadapter"
"github.com/filecoin-project/lotus/node"
"github.com/filecoin-project/lotus/node/config"
"github.com/filecoin-project/lotus/node/modules"
"github.com/filecoin-project/lotus/node/modules/dtypes"
"github.com/stretchr/testify/require"
)

func TestBatchDealInput(t *testing.T) {
t.Skip("this test is disabled as it's flaky: #4611")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Didn't flake in ~30 runs

kit.QuietMiningLogs()

var (
Expand Down Expand Up @@ -47,17 +49,20 @@ func TestBatchDealInput(t *testing.T) {
})),
node.Override(new(dtypes.GetSealingConfigFunc), func() (dtypes.GetSealingConfigFunc, error) {
return func() (sealiface.Config, error) {
return sealiface.Config{
MaxWaitDealsSectors: 2,
MaxSealingSectors: 1,
MaxSealingSectorsForDeals: 3,
AlwaysKeepUnsealedCopy: true,
WaitDealsDelay: time.Hour,
}, nil
sc := modules.ToSealingConfig(config.DefaultStorageMiner())
sc.MaxWaitDealsSectors = 2
sc.MaxSealingSectors = 1
sc.MaxSealingSectorsForDeals = 3
sc.AlwaysKeepUnsealedCopy = true
sc.WaitDealsDelay = time.Hour
sc.BatchPreCommits = false
sc.AggregateCommits = false

return sc, nil
}, nil
}),
))
client, miner, ens := kit.EnsembleMinimal(t, kit.MockProofs(), opts)
client, miner, ens := kit.EnsembleMinimal(t, kit.MockProofs(), opts, kit.ThroughRPC())
ens.InterconnectAll().BeginMining(blockTime)
dh := kit.NewDealHarness(t, client, miner, miner)

Expand Down Expand Up @@ -126,9 +131,9 @@ func TestBatchDealInput(t *testing.T) {
t.Run("4-p513B", run(513, 4, 2))
if !testing.Short() {
t.Run("32-p257B", run(257, 32, 8))
t.Run("32-p10B", run(10, 32, 2))

// fixme: this appears to break data-transfer / markets in some really creative ways
//t.Run("32-p10B", run(10, 32, 2))
// t.Run("128-p10B", run(10, 128, 8))
}
}
50 changes: 27 additions & 23 deletions itests/sector_miner_collateral_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ import (
"github.com/filecoin-project/lotus/extern/storage-sealing/sealiface"
"github.com/filecoin-project/lotus/itests/kit"
"github.com/filecoin-project/lotus/node"
"github.com/filecoin-project/lotus/node/config"
"github.com/filecoin-project/lotus/node/modules"
"github.com/filecoin-project/lotus/node/modules/dtypes"
"github.com/filecoin-project/lotus/node/repo"
)
Expand All @@ -40,29 +42,31 @@ func TestMinerBalanceCollateral(t *testing.T) {
opts := kit.ConstructorOpts(
node.ApplyIf(node.IsType(repo.StorageMiner), node.Override(new(dtypes.GetSealingConfigFunc), func() (dtypes.GetSealingConfigFunc, error) {
return func() (sealiface.Config, error) {
return sealiface.Config{
MaxWaitDealsSectors: 4,
MaxSealingSectors: 4,
MaxSealingSectorsForDeals: 4,
AlwaysKeepUnsealedCopy: true,
WaitDealsDelay: time.Hour,

BatchPreCommits: batching,
AggregateCommits: batching,

PreCommitBatchWait: time.Hour,
CommitBatchWait: time.Hour,

MinCommitBatch: nSectors,
MaxPreCommitBatch: nSectors,
MaxCommitBatch: nSectors,

CollateralFromMinerBalance: enabled,
AvailableBalanceBuffer: big.Zero(),
DisableCollateralFallback: false,
AggregateAboveBaseFee: big.Zero(),
BatchPreCommitAboveBaseFee: big.Zero(),
}, nil
sc := modules.ToSealingConfig(config.DefaultStorageMiner())

sc.MaxWaitDealsSectors = 4
sc.MaxSealingSectors = 4
sc.MaxSealingSectorsForDeals = 4
sc.AlwaysKeepUnsealedCopy = true
sc.WaitDealsDelay = time.Hour

sc.BatchPreCommits = batching
sc.AggregateCommits = batching

sc.PreCommitBatchWait = time.Hour
sc.CommitBatchWait = time.Hour

sc.MinCommitBatch = nSectors
sc.MaxPreCommitBatch = nSectors
sc.MaxCommitBatch = nSectors

sc.CollateralFromMinerBalance = enabled
sc.AvailableBalanceBuffer = big.Zero()
sc.DisableCollateralFallback = false
sc.AggregateAboveBaseFee = big.Zero()
sc.BatchPreCommitAboveBaseFee = big.Zero()

return sc, nil
}, nil
})),
)
Expand Down