Skip to content

Commit

Permalink
Merge pull request #6653 from filecoin-project/fix/finalize-in-storage
Browse files Browse the repository at this point in the history
storage: Fix FinalizeSector with sectors in stoage paths
  • Loading branch information
magik6k authored Jul 2, 2021
2 parents bcb839a + 8a94ab6 commit e2f48b2
Show file tree
Hide file tree
Showing 8 changed files with 164 additions and 29 deletions.
5 changes: 5 additions & 0 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -826,6 +826,11 @@ workflows:
suite: itest-sdr_upgrade
target: "./itests/sdr_upgrade_test.go"

- test:
name: test-itest-sector_finalize_early
suite: itest-sector_finalize_early
target: "./itests/sector_finalize_early_test.go"

- test:
name: test-itest-sector_pledge
suite: itest-sector_pledge
Expand Down
2 changes: 1 addition & 1 deletion cmd/lotus-seal-worker/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ var storageAttachCmd = &cli.Command{
}

if !(cfg.CanStore || cfg.CanSeal) {
return xerrors.Errorf("must specify at least one of --store of --seal")
return xerrors.Errorf("must specify at least one of --store or --seal")
}

b, err := json.MarshalIndent(cfg, "", " ")
Expand Down
2 changes: 1 addition & 1 deletion cmd/lotus-storage-miner/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ over time
}

if !(cfg.CanStore || cfg.CanSeal) {
return xerrors.Errorf("must specify at least one of --store of --seal")
return xerrors.Errorf("must specify at least one of --store or --seal")
}

b, err := json.MarshalIndent(cfg, "", " ")
Expand Down
17 changes: 16 additions & 1 deletion extern/sector-storage/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -528,10 +528,25 @@ func (m *Manager) FinalizeSector(ctx context.Context, sector storage.SectorRef,
}
}

pathType := storiface.PathStorage
{
sealedStores, err := m.index.StorageFindSector(ctx, sector.ID, storiface.FTSealed, 0, false)
if err != nil {
return xerrors.Errorf("finding sealed sector: %w", err)
}

for _, store := range sealedStores {
if store.CanSeal {
pathType = storiface.PathSealing
break
}
}
}

selector := newExistingSelector(m.index, sector.ID, storiface.FTCache|storiface.FTSealed, false)

err := m.sched.Schedule(ctx, sector, sealtasks.TTFinalize, selector,
m.schedFetch(sector, storiface.FTCache|storiface.FTSealed|unsealed, storiface.PathSealing, storiface.AcquireMove),
m.schedFetch(sector, storiface.FTCache|storiface.FTSealed|unsealed, pathType, storiface.AcquireMove),
func(ctx context.Context, w Worker) error {
_, err := m.waitSimpleCall(ctx)(w.FinalizeSector(ctx, sector, keepUnsealed))
return err
Expand Down
2 changes: 1 addition & 1 deletion itests/deals_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ func TestDealsWithSealingAndRPC(t *testing.T) {

kit.QuietMiningLogs()

var blockTime = 1 * time.Second
var blockTime = 50 * time.Millisecond

client, miner, ens := kit.EnsembleMinimal(t, kit.ThroughRPC()) // no mock proofs.
ens.InterconnectAll().BeginMining(blockTime)
Expand Down
47 changes: 46 additions & 1 deletion itests/kit/node_miner.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,29 @@ package kit

import (
"context"
"encoding/json"
"fmt"
"io/ioutil"
"os"
"path/filepath"
"strings"
"testing"
"time"

"github.com/google/uuid"
"github.com/stretchr/testify/require"

"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/wallet"
"github.com/filecoin-project/lotus/extern/sector-storage/stores"
sealing "github.com/filecoin-project/lotus/extern/storage-sealing"
"github.com/filecoin-project/lotus/miner"
libp2pcrypto "github.com/libp2p/go-libp2p-core/crypto"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/multiformats/go-multiaddr"
"github.com/stretchr/testify/require"
)

// TestMiner represents a miner enrolled in an Ensemble.
Expand Down Expand Up @@ -119,3 +126,41 @@ func (tm *TestMiner) FlushSealingBatches(ctx context.Context) {
fmt.Printf("COMMIT BATCH: %+v\n", cb)
}
}

const metaFile = "sectorstore.json"

func (tm *TestMiner) AddStorage(ctx context.Context, t *testing.T, weight uint64, seal, store bool) {
p, err := ioutil.TempDir("", "lotus-testsectors-")
require.NoError(t, err)

if err := os.MkdirAll(p, 0755); err != nil {
if !os.IsExist(err) {
require.NoError(t, err)
}
}

_, err = os.Stat(filepath.Join(p, metaFile))
if !os.IsNotExist(err) {
require.NoError(t, err)
}

cfg := &stores.LocalStorageMeta{
ID: stores.ID(uuid.New().String()),
Weight: weight,
CanSeal: seal,
CanStore: store,
}

if !(cfg.CanStore || cfg.CanSeal) {
t.Fatal("must specify at least one of CanStore or cfg.CanSeal")
}

b, err := json.MarshalIndent(cfg, "", " ")
require.NoError(t, err)

err = ioutil.WriteFile(filepath.Join(p, metaFile), b, 0644)
require.NoError(t, err)

err = tm.StorageAddLocal(ctx, p)
require.NoError(t, err)
}
66 changes: 66 additions & 0 deletions itests/sector_finalize_early_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package itests

import (
"context"
"fmt"
"testing"
"time"

"github.com/stretchr/testify/require"

"github.com/filecoin-project/lotus/extern/storage-sealing/sealiface"
"github.com/filecoin-project/lotus/itests/kit"
"github.com/filecoin-project/lotus/node"
"github.com/filecoin-project/lotus/node/config"
"github.com/filecoin-project/lotus/node/modules"
"github.com/filecoin-project/lotus/node/modules/dtypes"
"github.com/filecoin-project/lotus/node/repo"
)

func TestDealsWithFinalizeEarly(t *testing.T) {
if testing.Short() {
t.Skip("skipping test in short mode")
}

kit.QuietMiningLogs()

var blockTime = 50 * time.Millisecond

client, miner, ens := kit.EnsembleMinimal(t, kit.ThroughRPC(), kit.ConstructorOpts(
node.ApplyIf(node.IsType(repo.StorageMiner), node.Override(new(dtypes.GetSealingConfigFunc), func() (dtypes.GetSealingConfigFunc, error) {
return func() (sealiface.Config, error) {
cf := config.DefaultStorageMiner()
cf.Sealing.FinalizeEarly = true
return modules.ToSealingConfig(cf), nil
}, nil
})))) // no mock proofs.
ens.InterconnectAll().BeginMining(blockTime)
dh := kit.NewDealHarness(t, client, miner)

ctx := context.Background()

miner.AddStorage(ctx, t, 1000000000, true, false)
miner.AddStorage(ctx, t, 1000000000, false, true)

sl, err := miner.StorageList(ctx)
require.NoError(t, err)
for si, d := range sl {
i, err := miner.StorageInfo(ctx, si)
require.NoError(t, err)

fmt.Printf("stor d:%d %+v\n", len(d), i)
}

t.Run("single", func(t *testing.T) {
dh.RunConcurrentDeals(kit.RunConcurrentDealsOpts{N: 1})
})

sl, err = miner.StorageList(ctx)
require.NoError(t, err)
for si, d := range sl {
i, err := miner.StorageInfo(ctx, si)
require.NoError(t, err)

fmt.Printf("stor d:%d %+v\n", len(d), i)
}
}
52 changes: 28 additions & 24 deletions node/modules/storageminer.go
Original file line number Diff line number Diff line change
Expand Up @@ -882,33 +882,37 @@ func NewSetSealConfigFunc(r repo.LockedRepo) (dtypes.SetSealingConfigFunc, error
}, nil
}

func ToSealingConfig(cfg *config.StorageMiner) sealiface.Config {
return sealiface.Config{
MaxWaitDealsSectors: cfg.Sealing.MaxWaitDealsSectors,
MaxSealingSectors: cfg.Sealing.MaxSealingSectors,
MaxSealingSectorsForDeals: cfg.Sealing.MaxSealingSectorsForDeals,
WaitDealsDelay: time.Duration(cfg.Sealing.WaitDealsDelay),
AlwaysKeepUnsealedCopy: cfg.Sealing.AlwaysKeepUnsealedCopy,
FinalizeEarly: cfg.Sealing.FinalizeEarly,

BatchPreCommits: cfg.Sealing.BatchPreCommits,
MaxPreCommitBatch: cfg.Sealing.MaxPreCommitBatch,
PreCommitBatchWait: time.Duration(cfg.Sealing.PreCommitBatchWait),
PreCommitBatchSlack: time.Duration(cfg.Sealing.PreCommitBatchSlack),

AggregateCommits: cfg.Sealing.AggregateCommits,
MinCommitBatch: cfg.Sealing.MinCommitBatch,
MaxCommitBatch: cfg.Sealing.MaxCommitBatch,
CommitBatchWait: time.Duration(cfg.Sealing.CommitBatchWait),
CommitBatchSlack: time.Duration(cfg.Sealing.CommitBatchSlack),
AggregateAboveBaseFee: types.BigInt(cfg.Sealing.AggregateAboveBaseFee),

TerminateBatchMax: cfg.Sealing.TerminateBatchMax,
TerminateBatchMin: cfg.Sealing.TerminateBatchMin,
TerminateBatchWait: time.Duration(cfg.Sealing.TerminateBatchWait),
}
}

func NewGetSealConfigFunc(r repo.LockedRepo) (dtypes.GetSealingConfigFunc, error) {
return func() (out sealiface.Config, err error) {
err = readCfg(r, func(cfg *config.StorageMiner) {
out = sealiface.Config{
MaxWaitDealsSectors: cfg.Sealing.MaxWaitDealsSectors,
MaxSealingSectors: cfg.Sealing.MaxSealingSectors,
MaxSealingSectorsForDeals: cfg.Sealing.MaxSealingSectorsForDeals,
WaitDealsDelay: time.Duration(cfg.Sealing.WaitDealsDelay),
AlwaysKeepUnsealedCopy: cfg.Sealing.AlwaysKeepUnsealedCopy,
FinalizeEarly: cfg.Sealing.FinalizeEarly,

BatchPreCommits: cfg.Sealing.BatchPreCommits,
MaxPreCommitBatch: cfg.Sealing.MaxPreCommitBatch,
PreCommitBatchWait: time.Duration(cfg.Sealing.PreCommitBatchWait),
PreCommitBatchSlack: time.Duration(cfg.Sealing.PreCommitBatchSlack),

AggregateCommits: cfg.Sealing.AggregateCommits,
MinCommitBatch: cfg.Sealing.MinCommitBatch,
MaxCommitBatch: cfg.Sealing.MaxCommitBatch,
CommitBatchWait: time.Duration(cfg.Sealing.CommitBatchWait),
CommitBatchSlack: time.Duration(cfg.Sealing.CommitBatchSlack),
AggregateAboveBaseFee: types.BigInt(cfg.Sealing.AggregateAboveBaseFee),

TerminateBatchMax: cfg.Sealing.TerminateBatchMax,
TerminateBatchMin: cfg.Sealing.TerminateBatchMin,
TerminateBatchWait: time.Duration(cfg.Sealing.TerminateBatchWait),
}
out = ToSealingConfig(cfg)
})
return
}, nil
Expand Down

0 comments on commit e2f48b2

Please sign in to comment.