diff --git a/.circleci/config.yml b/.circleci/config.yml index 593da9cef11..bc108eaa198 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -937,6 +937,11 @@ workflows: suite: itest-nonce target: "./itests/nonce_test.go" + - test: + name: test-itest-path_detach_redeclare + suite: itest-path_detach_redeclare + target: "./itests/path_detach_redeclare_test.go" + - test: name: test-itest-path_type_filters suite: itest-path_type_filters diff --git a/api/api_storage.go b/api/api_storage.go index 3d019bb9e12..ca3905c9559 100644 --- a/api/api_storage.go +++ b/api/api_storage.go @@ -155,6 +155,7 @@ type StorageMiner interface { // paths.SectorIndex StorageAttach(context.Context, storiface.StorageInfo, fsutil.FsStat) error //perm:admin + StorageDetach(ctx context.Context, id storiface.ID, url string) error //perm:admin StorageInfo(context.Context, storiface.ID) (storiface.StorageInfo, error) //perm:admin StorageReportHealth(context.Context, storiface.ID, storiface.HealthReport) error //perm:admin StorageDeclareSector(ctx context.Context, storageID storiface.ID, s abi.SectorID, ft storiface.SectorFileType, primary bool) error //perm:admin @@ -182,6 +183,10 @@ type StorageMiner interface { StorageAuthVerify(ctx context.Context, token string) ([]auth.Permission, error) //perm:read + StorageAddLocal(ctx context.Context, path string) error //perm:admin + StorageDetachLocal(ctx context.Context, path string) error //perm:admin + StorageRedeclareLocal(ctx context.Context, id *storiface.ID, dropMissing bool) error //perm:admin + MarketImportDealData(ctx context.Context, propcid cid.Cid, path string) error //perm:write MarketListDeals(ctx context.Context) ([]*MarketDeal, error) //perm:read MarketListRetrievalDeals(ctx context.Context) ([]retrievalmarket.ProviderDealState, error) //perm:read @@ -279,8 +284,6 @@ type StorageMiner interface { DealsConsiderUnverifiedStorageDeals(context.Context) (bool, error) //perm:admin DealsSetConsiderUnverifiedStorageDeals(context.Context, bool) error //perm:admin - StorageAddLocal(ctx context.Context, path string) error //perm:admin - PiecesListPieces(ctx context.Context) ([]cid.Cid, error) //perm:read PiecesListCidInfos(ctx context.Context) ([]cid.Cid, error) //perm:read PiecesGetPieceInfo(ctx context.Context, pieceCid cid.Cid) (*piecestore.PieceInfo, error) //perm:read diff --git a/api/api_worker.go b/api/api_worker.go index 18c06ecb94c..77c22a51708 100644 --- a/api/api_worker.go +++ b/api/api_worker.go @@ -59,7 +59,10 @@ type Worker interface { // Storage / Other Remove(ctx context.Context, sector abi.SectorID) error //perm:admin - StorageAddLocal(ctx context.Context, path string) error //perm:admin + StorageLocal(ctx context.Context) (map[storiface.ID]string, error) //perm:admin + StorageAddLocal(ctx context.Context, path string) error //perm:admin + StorageDetachLocal(ctx context.Context, path string) error //perm:admin + StorageRedeclareLocal(ctx context.Context, id *storiface.ID, dropMissing bool) error //perm:admin // SetEnabled marks the worker as enabled/disabled. Not that this setting // may take a few seconds to propagate to task scheduler diff --git a/api/docgen/docgen.go b/api/docgen/docgen.go index e6e16986f5b..b2a96f8b71e 100644 --- a/api/docgen/docgen.go +++ b/api/docgen/docgen.go @@ -272,6 +272,8 @@ func init() { Read: [storiface.FileTypes]uint{2, 3, 0}, }, }) + storifaceid := storiface.ID("1399aa04-2625-44b1-bad4-bd07b59b22c4") + addExample(&storifaceid) // worker specific addExample(storiface.AcquireMove) diff --git a/api/proxy_gen.go b/api/proxy_gen.go index 506e615f2d4..70974d40524 100644 --- a/api/proxy_gen.go +++ b/api/proxy_gen.go @@ -864,6 +864,10 @@ type StorageMinerStruct struct { StorageDeclareSector func(p0 context.Context, p1 storiface.ID, p2 abi.SectorID, p3 storiface.SectorFileType, p4 bool) error `perm:"admin"` + StorageDetach func(p0 context.Context, p1 storiface.ID, p2 string) error `perm:"admin"` + + StorageDetachLocal func(p0 context.Context, p1 string) error `perm:"admin"` + StorageDropSector func(p0 context.Context, p1 storiface.ID, p2 abi.SectorID, p3 storiface.SectorFileType) error `perm:"admin"` StorageFindSector func(p0 context.Context, p1 abi.SectorID, p2 storiface.SectorFileType, p3 abi.SectorSize, p4 bool) ([]storiface.SectorStorageInfo, error) `perm:"admin"` @@ -878,6 +882,8 @@ type StorageMinerStruct struct { StorageLock func(p0 context.Context, p1 abi.SectorID, p2 storiface.SectorFileType, p3 storiface.SectorFileType) error `perm:"admin"` + StorageRedeclareLocal func(p0 context.Context, p1 *storiface.ID, p2 bool) error `perm:"admin"` + StorageReportHealth func(p0 context.Context, p1 storiface.ID, p2 storiface.HealthReport) error `perm:"admin"` StorageStat func(p0 context.Context, p1 storiface.ID) (fsutil.FsStat, error) `perm:"admin"` @@ -973,6 +979,12 @@ type WorkerStruct struct { StorageAddLocal func(p0 context.Context, p1 string) error `perm:"admin"` + StorageDetachLocal func(p0 context.Context, p1 string) error `perm:"admin"` + + StorageLocal func(p0 context.Context) (map[storiface.ID]string, error) `perm:"admin"` + + StorageRedeclareLocal func(p0 context.Context, p1 *storiface.ID, p2 bool) error `perm:"admin"` + TaskDisable func(p0 context.Context, p1 sealtasks.TaskType) error `perm:"admin"` TaskEnable func(p0 context.Context, p1 sealtasks.TaskType) error `perm:"admin"` @@ -5104,6 +5116,28 @@ func (s *StorageMinerStub) StorageDeclareSector(p0 context.Context, p1 storiface return ErrNotSupported } +func (s *StorageMinerStruct) StorageDetach(p0 context.Context, p1 storiface.ID, p2 string) error { + if s.Internal.StorageDetach == nil { + return ErrNotSupported + } + return s.Internal.StorageDetach(p0, p1, p2) +} + +func (s *StorageMinerStub) StorageDetach(p0 context.Context, p1 storiface.ID, p2 string) error { + return ErrNotSupported +} + +func (s *StorageMinerStruct) StorageDetachLocal(p0 context.Context, p1 string) error { + if s.Internal.StorageDetachLocal == nil { + return ErrNotSupported + } + return s.Internal.StorageDetachLocal(p0, p1) +} + +func (s *StorageMinerStub) StorageDetachLocal(p0 context.Context, p1 string) error { + return ErrNotSupported +} + func (s *StorageMinerStruct) StorageDropSector(p0 context.Context, p1 storiface.ID, p2 abi.SectorID, p3 storiface.SectorFileType) error { if s.Internal.StorageDropSector == nil { return ErrNotSupported @@ -5181,6 +5215,17 @@ func (s *StorageMinerStub) StorageLock(p0 context.Context, p1 abi.SectorID, p2 s return ErrNotSupported } +func (s *StorageMinerStruct) StorageRedeclareLocal(p0 context.Context, p1 *storiface.ID, p2 bool) error { + if s.Internal.StorageRedeclareLocal == nil { + return ErrNotSupported + } + return s.Internal.StorageRedeclareLocal(p0, p1, p2) +} + +func (s *StorageMinerStub) StorageRedeclareLocal(p0 context.Context, p1 *storiface.ID, p2 bool) error { + return ErrNotSupported +} + func (s *StorageMinerStruct) StorageReportHealth(p0 context.Context, p1 storiface.ID, p2 storiface.HealthReport) error { if s.Internal.StorageReportHealth == nil { return ErrNotSupported @@ -5610,6 +5655,39 @@ func (s *WorkerStub) StorageAddLocal(p0 context.Context, p1 string) error { return ErrNotSupported } +func (s *WorkerStruct) StorageDetachLocal(p0 context.Context, p1 string) error { + if s.Internal.StorageDetachLocal == nil { + return ErrNotSupported + } + return s.Internal.StorageDetachLocal(p0, p1) +} + +func (s *WorkerStub) StorageDetachLocal(p0 context.Context, p1 string) error { + return ErrNotSupported +} + +func (s *WorkerStruct) StorageLocal(p0 context.Context) (map[storiface.ID]string, error) { + if s.Internal.StorageLocal == nil { + return *new(map[storiface.ID]string), ErrNotSupported + } + return s.Internal.StorageLocal(p0) +} + +func (s *WorkerStub) StorageLocal(p0 context.Context) (map[storiface.ID]string, error) { + return *new(map[storiface.ID]string), ErrNotSupported +} + +func (s *WorkerStruct) StorageRedeclareLocal(p0 context.Context, p1 *storiface.ID, p2 bool) error { + if s.Internal.StorageRedeclareLocal == nil { + return ErrNotSupported + } + return s.Internal.StorageRedeclareLocal(p0, p1, p2) +} + +func (s *WorkerStub) StorageRedeclareLocal(p0 context.Context, p1 *storiface.ID, p2 bool) error { + return ErrNotSupported +} + func (s *WorkerStruct) TaskDisable(p0 context.Context, p1 sealtasks.TaskType) error { if s.Internal.TaskDisable == nil { return ErrNotSupported diff --git a/build/openrpc/full.json.gz b/build/openrpc/full.json.gz index 5f798d94227..39128f2c20d 100644 Binary files a/build/openrpc/full.json.gz and b/build/openrpc/full.json.gz differ diff --git a/build/openrpc/gateway.json.gz b/build/openrpc/gateway.json.gz index 79d54ee3927..d1d18dde6c8 100644 Binary files a/build/openrpc/gateway.json.gz and b/build/openrpc/gateway.json.gz differ diff --git a/build/openrpc/miner.json.gz b/build/openrpc/miner.json.gz index 158286ef489..9d3021a7a51 100644 Binary files a/build/openrpc/miner.json.gz and b/build/openrpc/miner.json.gz differ diff --git a/build/openrpc/worker.json.gz b/build/openrpc/worker.json.gz index e1cf8100d6e..9d6af03202d 100644 Binary files a/build/openrpc/worker.json.gz and b/build/openrpc/worker.json.gz differ diff --git a/cmd/lotus-miner/storage.go b/cmd/lotus-miner/storage.go index 035af80c66d..cc5d9453423 100644 --- a/cmd/lotus-miner/storage.go +++ b/cmd/lotus-miner/storage.go @@ -46,6 +46,8 @@ long term for proving (references as 'store') as well as how sectors will be stored while moving through the sealing pipeline (references as 'seal').`, Subcommands: []*cli.Command{ storageAttachCmd, + storageDetachCmd, + storageRedeclareCmd, storageListCmd, storageFindCmd, storageCleanupCmd, @@ -174,6 +176,82 @@ over time }, } +var storageDetachCmd = &cli.Command{ + Name: "detach", + Usage: "detach local storage path", + Flags: []cli.Flag{ + &cli.BoolFlag{ + Name: "really-do-it", + }, + }, + ArgsUsage: "[path]", + Action: func(cctx *cli.Context) error { + nodeApi, closer, err := lcli.GetStorageMinerAPI(cctx) + if err != nil { + return err + } + defer closer() + ctx := lcli.ReqContext(cctx) + + if !cctx.Args().Present() { + return xerrors.Errorf("must specify storage path") + } + + p, err := homedir.Expand(cctx.Args().First()) + if err != nil { + return xerrors.Errorf("expanding path: %w", err) + } + + if !cctx.Bool("really-do-it") { + return xerrors.Errorf("pass --really-do-it to execute the action") + } + + return nodeApi.StorageDetachLocal(ctx, p) + }, +} + +var storageRedeclareCmd = &cli.Command{ + Name: "redeclare", + Usage: "redeclare sectors in a local storage path", + Flags: []cli.Flag{ + &cli.StringFlag{ + Name: "id", + Usage: "storage path ID", + }, + &cli.BoolFlag{ + Name: "all", + Usage: "redeclare all storage paths", + }, + &cli.BoolFlag{ + Name: "drop-missing", + Usage: "Drop index entries with missing files", + }, + }, + Action: func(cctx *cli.Context) error { + nodeApi, closer, err := lcli.GetStorageMinerAPI(cctx) + if err != nil { + return err + } + defer closer() + ctx := lcli.ReqContext(cctx) + + if cctx.IsSet("id") && cctx.Bool("all") { + return xerrors.Errorf("--id and --all can't be passed at the same time") + } + + if cctx.IsSet("id") { + id := storiface.ID(cctx.String("id")) + return nodeApi.StorageRedeclareLocal(ctx, &id, cctx.Bool("drop-missing")) + } + + if cctx.Bool("all") { + return nodeApi.StorageRedeclareLocal(ctx, nil, cctx.Bool("drop-missing")) + } + + return xerrors.Errorf("either --all or --id must be specified") + }, +} + var storageListCmd = &cli.Command{ Name: "list", Usage: "list local storage paths", diff --git a/cmd/lotus-worker/main.go b/cmd/lotus-worker/main.go index 5af63f70c7e..de8e63f1fbf 100644 --- a/cmd/lotus-worker/main.go +++ b/cmd/lotus-worker/main.go @@ -619,7 +619,7 @@ var runCmd = &cli.Command{ if redeclareStorage { log.Info("Redeclaring local storage") - if err := localStore.Redeclare(ctx); err != nil { + if err := localStore.Redeclare(ctx, nil, false); err != nil { log.Errorf("Redeclaring local storage failed: %+v", err) select { diff --git a/cmd/lotus-worker/sealworker/rpc.go b/cmd/lotus-worker/sealworker/rpc.go index 605f8a532f4..fc435487452 100644 --- a/cmd/lotus-worker/sealworker/rpc.go +++ b/cmd/lotus-worker/sealworker/rpc.go @@ -65,6 +65,20 @@ func (w *Worker) Version(context.Context) (api.Version, error) { return api.WorkerAPIVersion0, nil } +func (w *Worker) StorageLocal(ctx context.Context) (map[storiface.ID]string, error) { + l, err := w.LocalStore.Local(ctx) + if err != nil { + return nil, err + } + + out := map[storiface.ID]string{} + for _, st := range l { + out[st.ID] = st.LocalPath + } + + return out, nil +} + func (w *Worker) StorageAddLocal(ctx context.Context, path string) error { path, err := homedir.Expand(path) if err != nil { @@ -84,6 +98,58 @@ func (w *Worker) StorageAddLocal(ctx context.Context, path string) error { return nil } +func (w *Worker) StorageDetachLocal(ctx context.Context, path string) error { + path, err := homedir.Expand(path) + if err != nil { + return xerrors.Errorf("expanding local path: %w", err) + } + + // check that we have the path opened + lps, err := w.LocalStore.Local(ctx) + if err != nil { + return xerrors.Errorf("getting local path list: %w", err) + } + + var localPath *storiface.StoragePath + for _, lp := range lps { + if lp.LocalPath == path { + lp := lp // copy to make the linter happy + localPath = &lp + break + } + } + if localPath == nil { + return xerrors.Errorf("no local paths match '%s'", path) + } + + // drop from the persisted storage.json + var found bool + if err := w.Storage.SetStorage(func(sc *paths.StorageConfig) { + out := make([]paths.LocalPath, 0, len(sc.StoragePaths)) + for _, storagePath := range sc.StoragePaths { + if storagePath.Path != path { + out = append(out, storagePath) + continue + } + found = true + } + sc.StoragePaths = out + }); err != nil { + return xerrors.Errorf("set storage config: %w", err) + } + if !found { + // maybe this is fine? + return xerrors.Errorf("path not found in storage.json") + } + + // unregister locally, drop from sector index + return w.LocalStore.ClosePath(ctx, localPath.ID) +} + +func (w *Worker) StorageRedeclareLocal(ctx context.Context, id *storiface.ID, dropMissing bool) error { + return w.LocalStore.Redeclare(ctx, id, dropMissing) +} + func (w *Worker) SetEnabled(ctx context.Context, enabled bool) error { disabled := int64(1) if enabled { @@ -123,3 +189,4 @@ func (w *Worker) Shutdown(ctx context.Context) error { } var _ storiface.WorkerCalls = &Worker{} +var _ api.Worker = &Worker{} diff --git a/cmd/lotus-worker/storage.go b/cmd/lotus-worker/storage.go index 2f1cd2f711c..0736ffbfb88 100644 --- a/cmd/lotus-worker/storage.go +++ b/cmd/lotus-worker/storage.go @@ -24,6 +24,8 @@ var storageCmd = &cli.Command{ Usage: "manage sector storage", Subcommands: []*cli.Command{ storageAttachCmd, + storageDetachCmd, + storageRedeclareCmd, }, } @@ -128,3 +130,79 @@ var storageAttachCmd = &cli.Command{ return nodeApi.StorageAddLocal(ctx, p) }, } + +var storageDetachCmd = &cli.Command{ + Name: "detach", + Usage: "detach local storage path", + Flags: []cli.Flag{ + &cli.BoolFlag{ + Name: "really-do-it", + }, + }, + ArgsUsage: "[path]", + Action: func(cctx *cli.Context) error { + nodeApi, closer, err := lcli.GetWorkerAPI(cctx) + if err != nil { + return err + } + defer closer() + ctx := lcli.ReqContext(cctx) + + if !cctx.Args().Present() { + return xerrors.Errorf("must specify storage path") + } + + p, err := homedir.Expand(cctx.Args().First()) + if err != nil { + return xerrors.Errorf("expanding path: %w", err) + } + + if !cctx.Bool("really-do-it") { + return xerrors.Errorf("pass --really-do-it to execute the action") + } + + return nodeApi.StorageDetachLocal(ctx, p) + }, +} + +var storageRedeclareCmd = &cli.Command{ + Name: "redeclare", + Usage: "redeclare sectors in a local storage path", + Flags: []cli.Flag{ + &cli.StringFlag{ + Name: "id", + Usage: "storage path ID", + }, + &cli.BoolFlag{ + Name: "all", + Usage: "redeclare all storage paths", + }, + &cli.BoolFlag{ + Name: "drop-missing", + Usage: "Drop index entries with missing files", + }, + }, + Action: func(cctx *cli.Context) error { + nodeApi, closer, err := lcli.GetWorkerAPI(cctx) + if err != nil { + return err + } + defer closer() + ctx := lcli.ReqContext(cctx) + + if cctx.IsSet("id") && cctx.Bool("all") { + return xerrors.Errorf("--id and --all can't be passed at the same time") + } + + if cctx.IsSet("id") { + id := storiface.ID(cctx.String("id")) + return nodeApi.StorageRedeclareLocal(ctx, &id, cctx.Bool("drop-missing")) + } + + if cctx.Bool("all") { + return nodeApi.StorageRedeclareLocal(ctx, nil, cctx.Bool("drop-missing")) + } + + return xerrors.Errorf("either --all or --id must be specified") + }, +} diff --git a/documentation/en/api-v0-methods-miner.md b/documentation/en/api-v0-methods-miner.md index 4507e03b873..7a1d4ad9b3a 100644 --- a/documentation/en/api-v0-methods-miner.md +++ b/documentation/en/api-v0-methods-miner.md @@ -162,6 +162,8 @@ * [StorageAuthVerify](#StorageAuthVerify) * [StorageBestAlloc](#StorageBestAlloc) * [StorageDeclareSector](#StorageDeclareSector) + * [StorageDetach](#StorageDetach) + * [StorageDetachLocal](#StorageDetachLocal) * [StorageDropSector](#StorageDropSector) * [StorageFindSector](#StorageFindSector) * [StorageGetLocks](#StorageGetLocks) @@ -169,6 +171,7 @@ * [StorageList](#StorageList) * [StorageLocal](#StorageLocal) * [StorageLock](#StorageLock) + * [StorageRedeclareLocal](#StorageRedeclareLocal) * [StorageReportHealth](#StorageReportHealth) * [StorageStat](#StorageStat) * [StorageTryLock](#StorageTryLock) @@ -3437,6 +3440,35 @@ Inputs: Response: `{}` +### StorageDetach + + +Perms: admin + +Inputs: +```json +[ + "76f1988b-ef30-4d7e-b3ec-9a627f4ba5a8", + "string value" +] +``` + +Response: `{}` + +### StorageDetachLocal + + +Perms: admin + +Inputs: +```json +[ + "string value" +] +``` + +Response: `{}` + ### StorageDropSector @@ -3633,6 +3665,21 @@ Inputs: Response: `{}` +### StorageRedeclareLocal + + +Perms: admin + +Inputs: +```json +[ + "1399aa04-2625-44b1-bad4-bd07b59b22c4", + true +] +``` + +Response: `{}` + ### StorageReportHealth diff --git a/documentation/en/api-v0-methods-worker.md b/documentation/en/api-v0-methods-worker.md index 07f182a95c3..ca98bc4bbe4 100644 --- a/documentation/en/api-v0-methods-worker.md +++ b/documentation/en/api-v0-methods-worker.md @@ -39,6 +39,9 @@ * [SetEnabled](#SetEnabled) * [Storage](#Storage) * [StorageAddLocal](#StorageAddLocal) + * [StorageDetachLocal](#StorageDetachLocal) + * [StorageLocal](#StorageLocal) + * [StorageRedeclareLocal](#StorageRedeclareLocal) * [Task](#Task) * [TaskDisable](#TaskDisable) * [TaskEnable](#TaskEnable) @@ -2118,6 +2121,49 @@ Inputs: Response: `{}` +### StorageDetachLocal + + +Perms: admin + +Inputs: +```json +[ + "string value" +] +``` + +Response: `{}` + +### StorageLocal + + +Perms: admin + +Inputs: `null` + +Response: +```json +{ + "76f1988b-ef30-4d7e-b3ec-9a627f4ba5a8": "/data/path" +} +``` + +### StorageRedeclareLocal + + +Perms: admin + +Inputs: +```json +[ + "1399aa04-2625-44b1-bad4-bd07b59b22c4", + true +] +``` + +Response: `{}` + ## Task diff --git a/documentation/en/cli-lotus-miner.md b/documentation/en/cli-lotus-miner.md index cd78e9a02a4..0cdf817494c 100644 --- a/documentation/en/cli-lotus-miner.md +++ b/documentation/en/cli-lotus-miner.md @@ -2151,12 +2151,14 @@ DESCRIPTION: stored while moving through the sealing pipeline (references as 'seal'). COMMANDS: - attach attach local storage path - list list local storage paths - find find sector in the storage system - cleanup trigger cleanup actions - locks show active sector locks - help, h Shows a list of commands or help for one command + attach attach local storage path + detach detach local storage path + redeclare redeclare sectors in a local storage path + list list local storage paths + find find sector in the storage system + cleanup trigger cleanup actions + locks show active sector locks + help, h Shows a list of commands or help for one command OPTIONS: --help, -h show help (default: false) @@ -2202,6 +2204,34 @@ OPTIONS: ``` +### lotus-miner storage detach +``` +NAME: + lotus-miner storage detach - detach local storage path + +USAGE: + lotus-miner storage detach [command options] [path] + +OPTIONS: + --really-do-it (default: false) + +``` + +### lotus-miner storage redeclare +``` +NAME: + lotus-miner storage redeclare - redeclare sectors in a local storage path + +USAGE: + lotus-miner storage redeclare [command options] [arguments...] + +OPTIONS: + --all redeclare all storage paths (default: false) + --drop-missing Drop index entries with missing files (default: false) + --id value storage path ID + +``` + ### lotus-miner storage list ``` NAME: diff --git a/documentation/en/cli-lotus-worker.md b/documentation/en/cli-lotus-worker.md index ee5c5c1ded1..68d4b15672b 100644 --- a/documentation/en/cli-lotus-worker.md +++ b/documentation/en/cli-lotus-worker.md @@ -95,8 +95,10 @@ USAGE: lotus-worker storage command [command options] [arguments...] COMMANDS: - attach attach local storage path - help, h Shows a list of commands or help for one command + attach attach local storage path + detach detach local storage path + redeclare redeclare sectors in a local storage path + help, h Shows a list of commands or help for one command OPTIONS: --help, -h show help (default: false) @@ -122,6 +124,34 @@ OPTIONS: ``` +### lotus-worker storage detach +``` +NAME: + lotus-worker storage detach - detach local storage path + +USAGE: + lotus-worker storage detach [command options] [path] + +OPTIONS: + --really-do-it (default: false) + +``` + +### lotus-worker storage redeclare +``` +NAME: + lotus-worker storage redeclare - redeclare sectors in a local storage path + +USAGE: + lotus-worker storage redeclare [command options] [arguments...] + +OPTIONS: + --all redeclare all storage paths (default: false) + --drop-missing Drop index entries with missing files (default: false) + --id value storage path ID + +``` + ## lotus-worker set ``` NAME: diff --git a/itests/batch_deal_test.go b/itests/batch_deal_test.go index 64326c82264..329a6b3bb03 100644 --- a/itests/batch_deal_test.go +++ b/itests/batch_deal_test.go @@ -74,7 +74,7 @@ func TestBatchDealInput(t *testing.T) { require.NoError(t, err) checkNoPadding := func() { - sl, err := miner.SectorsList(ctx) + sl, err := miner.SectorsListNonGenesis(ctx) require.NoError(t, err) sort.Slice(sl, func(i, j int) bool { @@ -125,7 +125,7 @@ func TestBatchDealInput(t *testing.T) { checkNoPadding() - sl, err := miner.SectorsList(ctx) + sl, err := miner.SectorsListNonGenesis(ctx) require.NoError(t, err) require.Equal(t, len(sl), expectSectors) } diff --git a/itests/ccupgrade_test.go b/itests/ccupgrade_test.go index 880c4f30989..aae99c6baf1 100644 --- a/itests/ccupgrade_test.go +++ b/itests/ccupgrade_test.go @@ -60,11 +60,11 @@ func runTestCCUpgrade(t *testing.T) *kit.TestFullNode { t.Fatal(err) } - CCUpgrade := abi.SectorNumber(kit.DefaultPresealsPerBootstrapMiner + 1) + CCUpgrade := abi.SectorNumber(kit.DefaultPresealsPerBootstrapMiner) fmt.Printf("CCUpgrade: %d\n", CCUpgrade) miner.PledgeSectors(ctx, 1, 0, nil) - sl, err := miner.SectorsList(ctx) + sl, err := miner.SectorsListNonGenesis(ctx) require.NoError(t, err) require.Len(t, sl, 1, "expected 1 sector") require.Equal(t, CCUpgrade, sl[0], "unexpected sector number") @@ -79,7 +79,7 @@ func runTestCCUpgrade(t *testing.T) *kit.TestFullNode { err = miner.SectorMarkForUpgrade(ctx, sl[0], true) require.NoError(t, err) - sl, err = miner.SectorsList(ctx) + sl, err = miner.SectorsListNonGenesis(ctx) require.NoError(t, err) require.Len(t, sl, 1, "expected 1 sector") diff --git a/itests/deadlines_test.go b/itests/deadlines_test.go index edcabba2152..e76d4dafd53 100644 --- a/itests/deadlines_test.go +++ b/itests/deadlines_test.go @@ -308,7 +308,7 @@ func TestDeadlineToggling(t *testing.T) { // terminate sectors on minerD { var terminationDeclarationParams []miner2.TerminationDeclaration - secs, err := minerD.SectorsList(ctx) + secs, err := minerD.SectorsListNonGenesis(ctx) require.NoError(t, err) require.Len(t, secs, sectorsD) diff --git a/itests/deals_pricing_test.go b/itests/deals_pricing_test.go index e01f74e0c63..15482f62b6f 100644 --- a/itests/deals_pricing_test.go +++ b/itests/deals_pricing_test.go @@ -70,7 +70,7 @@ func TestQuotePriceForUnsealedRetrieval(t *testing.T) { //stm: @STORAGE_LIST_001, @MINER_SECTOR_LIST_001 ss, err := miner.StorageList(context.Background()) require.NoError(t, err) - _, err = miner.SectorsList(ctx) + _, err = miner.SectorsListNonGenesis(ctx) require.NoError(t, err) //stm: @STORAGE_DROP_SECTOR_001, @STORAGE_LIST_001 @@ -95,7 +95,7 @@ iLoop: // remove the other unsealed file as well ss, err = miner.StorageList(context.Background()) require.NoError(t, err) - _, err = miner.SectorsList(ctx) + _, err = miner.SectorsListNonGenesis(ctx) require.NoError(t, err) for storeID, sd := range ss { for _, sector := range sd { diff --git a/itests/kit/deals.go b/itests/kit/deals.go index 55ca4e1b760..794a6380328 100644 --- a/itests/kit/deals.go +++ b/itests/kit/deals.go @@ -292,7 +292,7 @@ func (dh *DealHarness) WaitDealPublished(ctx context.Context, deal *cid.Cid) { } func (dh *DealHarness) StartSealingWaiting(ctx context.Context) { - snums, err := dh.main.SectorsList(ctx) + snums, err := dh.main.SectorsListNonGenesis(ctx) require.NoError(dh.t, err) for _, snum := range snums { si, err := dh.main.SectorsStatus(ctx, snum, false) diff --git a/itests/kit/ensemble.go b/itests/kit/ensemble.go index 434917c54b1..50e72fdd57b 100644 --- a/itests/kit/ensemble.go +++ b/itests/kit/ensemble.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "crypto/rand" + "encoding/binary" "fmt" "io/ioutil" "net" @@ -20,13 +21,13 @@ import ( "github.com/stretchr/testify/require" "github.com/filecoin-project/go-address" + cborutil "github.com/filecoin-project/go-cbor-util" "github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/go-state-types/big" "github.com/filecoin-project/go-state-types/builtin" "github.com/filecoin-project/go-state-types/exitcode" "github.com/filecoin-project/go-state-types/network" "github.com/filecoin-project/go-statestore" - "github.com/filecoin-project/go-storedcounter" miner2 "github.com/filecoin-project/specs-actors/v2/actors/builtin/miner" power3 "github.com/filecoin-project/specs-actors/v3/actors/builtin/power" @@ -56,6 +57,7 @@ import ( testing2 "github.com/filecoin-project/lotus/node/modules/testing" "github.com/filecoin-project/lotus/node/repo" "github.com/filecoin-project/lotus/storage/paths" + pipeline "github.com/filecoin-project/lotus/storage/pipeline" sectorstorage "github.com/filecoin-project/lotus/storage/sealer" "github.com/filecoin-project/lotus/storage/sealer/mock" "github.com/filecoin-project/lotus/storage/sealer/storiface" @@ -233,11 +235,14 @@ func (n *Ensemble) Miner(minerNode *TestMiner, full *TestFullNode, opts ...NodeO } ownerKey := options.ownerKey + var presealSectors int + if !n.bootstrapped { + presealSectors = options.sectors + var ( - sectors = options.sectors - k *types.KeyInfo - genm *genesis.Miner + k *types.KeyInfo + genm *genesis.Miner ) // Will use 2KiB sectors by default (default value of sectorSize). @@ -246,9 +251,9 @@ func (n *Ensemble) Miner(minerNode *TestMiner, full *TestFullNode, opts ...NodeO // Create the preseal commitment. if n.options.mockProofs { - genm, k, err = mock.PreSeal(proofType, actorAddr, sectors) + genm, k, err = mock.PreSeal(proofType, actorAddr, presealSectors) } else { - genm, k, err = seed.PreSeal(actorAddr, proofType, 0, sectors, tdir, []byte("make genesis mem random"), nil, true) + genm, k, err = seed.PreSeal(actorAddr, proofType, 0, presealSectors, tdir, []byte("make genesis mem random"), nil, true) } require.NoError(n.t, err) @@ -279,6 +284,7 @@ func (n *Ensemble) Miner(minerNode *TestMiner, full *TestFullNode, opts ...NodeO OwnerKey: ownerKey, FullNode: full, PresealDir: tdir, + PresealSectors: presealSectors, options: options, RemoteListener: rl, } @@ -535,13 +541,10 @@ func (n *Ensemble) Start() *Ensemble { err = ds.Put(ctx, datastore.NewKey("miner-address"), m.ActorAddr.Bytes()) require.NoError(n.t, err) - nic := storedcounter.New(ds, datastore.NewKey(modules.StorageCounterDSPrefix)) - for i := 0; i < m.options.sectors; i++ { - _, err := nic.Next() - require.NoError(n.t, err) + if i < len(n.genesis.miners) && !n.bootstrapped { + // if this is a genesis miner, import preseal metadata + require.NoError(n.t, importPreSealMeta(ctx, n.genesis.miners[i], ds)) } - _, err = nic.Next() - require.NoError(n.t, err) // using real proofs, therefore need real sectors. if !n.bootstrapped && !n.options.mockProofs { @@ -913,3 +916,46 @@ func (n *Ensemble) generateGenesis() *genesis.Template { return templ } + +func importPreSealMeta(ctx context.Context, meta genesis.Miner, mds dtypes.MetadataDS) error { + maxSectorID := abi.SectorNumber(0) + for _, sector := range meta.Sectors { + sectorKey := datastore.NewKey(pipeline.SectorStorePrefix).ChildString(fmt.Sprint(sector.SectorID)) + + commD := sector.CommD + commR := sector.CommR + + info := &pipeline.SectorInfo{ + State: pipeline.Proving, + SectorNumber: sector.SectorID, + Pieces: []pipeline.Piece{ + { + Piece: abi.PieceInfo{ + Size: abi.PaddedPieceSize(meta.SectorSize), + PieceCID: commD, + }, + DealInfo: nil, // todo: likely possible to get, but not really that useful + }, + }, + CommD: &commD, + CommR: &commR, + } + + b, err := cborutil.Dump(info) + if err != nil { + return err + } + + if err := mds.Put(ctx, sectorKey, b); err != nil { + return err + } + + if sector.SectorID > maxSectorID { + maxSectorID = sector.SectorID + } + } + + buf := make([]byte, binary.MaxVarintLen64) + size := binary.PutUvarint(buf, uint64(maxSectorID)) + return mds.Put(ctx, datastore.NewKey(modules.StorageCounterDSPrefix), buf[:size]) +} diff --git a/itests/kit/node_miner.go b/itests/kit/node_miner.go index 65c7abe3828..c48a09c17ed 100644 --- a/itests/kit/node_miner.go +++ b/itests/kit/node_miner.go @@ -8,6 +8,7 @@ import ( "net" "os" "path/filepath" + "sort" "strings" "testing" "time" @@ -76,8 +77,9 @@ type TestMiner struct { MineOne func(context.Context, miner.MineReq) error Stop func(context.Context) error - FullNode *TestFullNode - PresealDir string + FullNode *TestFullNode + PresealDir string + PresealSectors int Libp2p struct { PeerID peer.ID @@ -128,9 +130,9 @@ func (tm *TestMiner) StartPledge(ctx context.Context, n, existing int, blockNoti } for { - s, err := tm.StorageMiner.SectorsList(ctx) // Note - the test builder doesn't import genesis sectors into FSM + s, err := tm.SectorsListNonGenesis(ctx) require.NoError(tm.t, err) - fmt.Printf("Sectors: %d\n", len(s)) + fmt.Printf("Sectors: %d (n %d, ex %d)\n", len(s), n, existing) if len(s) >= n+existing { break } @@ -140,7 +142,7 @@ func (tm *TestMiner) StartPledge(ctx context.Context, n, existing int, blockNoti fmt.Printf("All sectors is fsm\n") - s, err := tm.StorageMiner.SectorsList(ctx) + s, err := tm.SectorsListNonGenesis(ctx) require.NoError(tm.t, err) toCheck := map[abi.SectorNumber]struct{}{} @@ -205,3 +207,15 @@ func (tm *TestMiner) AddStorage(ctx context.Context, t *testing.T, conf func(*pa return cfg.ID } +func (tm *TestMiner) SectorsListNonGenesis(ctx context.Context) ([]abi.SectorNumber, error) { + l, err := tm.SectorsList(ctx) + if err != nil { + return nil, err + } + // sort just in case + sort.Slice(l, func(i, j int) bool { + return l[i] < l[j] + }) + + return l[tm.PresealSectors:], nil +} diff --git a/itests/kit/node_worker.go b/itests/kit/node_worker.go index 538c739a7ff..3a6a55c5515 100644 --- a/itests/kit/node_worker.go +++ b/itests/kit/node_worker.go @@ -2,13 +2,21 @@ package kit import ( "context" + "encoding/json" + "io/ioutil" "net" "net/http" + "os" + "path/filepath" "testing" + "github.com/google/uuid" "github.com/multiformats/go-multiaddr" + "github.com/stretchr/testify/require" "github.com/filecoin-project/lotus/api" + "github.com/filecoin-project/lotus/storage/paths" + "github.com/filecoin-project/lotus/storage/sealer/storiface" ) // TestWorker represents a worker enrolled in an Ensemble. @@ -29,3 +37,42 @@ type TestWorker struct { options nodeOpts } + +func (tm *TestWorker) AddStorage(ctx context.Context, t *testing.T, conf func(*paths.LocalStorageMeta)) storiface.ID { + p := t.TempDir() + + if err := os.MkdirAll(p, 0755); err != nil { + if !os.IsExist(err) { + require.NoError(t, err) + } + } + + _, err := os.Stat(filepath.Join(p, metaFile)) + if !os.IsNotExist(err) { + require.NoError(t, err) + } + + cfg := &paths.LocalStorageMeta{ + ID: storiface.ID(uuid.New().String()), + Weight: 10, + CanSeal: false, + CanStore: false, + } + + conf(cfg) + + if !(cfg.CanStore || cfg.CanSeal) { + t.Fatal("must specify at least one of CanStore or cfg.CanSeal") + } + + b, err := json.MarshalIndent(cfg, "", " ") + require.NoError(t, err) + + err = ioutil.WriteFile(filepath.Join(p, metaFile), b, 0644) + require.NoError(t, err) + + err = tm.StorageAddLocal(ctx, p) + require.NoError(t, err) + + return cfg.ID +} diff --git a/itests/path_detach_redeclare_test.go b/itests/path_detach_redeclare_test.go new file mode 100644 index 00000000000..124266b7d94 --- /dev/null +++ b/itests/path_detach_redeclare_test.go @@ -0,0 +1,413 @@ +package itests + +import ( + "context" + "os" + "os/exec" + "path/filepath" + "testing" + + logging "github.com/ipfs/go-log/v2" + "github.com/stretchr/testify/require" + + "github.com/filecoin-project/go-address" + "github.com/filecoin-project/go-state-types/abi" + + "github.com/filecoin-project/lotus/chain/types" + "github.com/filecoin-project/lotus/itests/kit" + "github.com/filecoin-project/lotus/storage/paths" + "github.com/filecoin-project/lotus/storage/sealer/sealtasks" + "github.com/filecoin-project/lotus/storage/sealer/storiface" +) + +func TestPathDetachRedeclare(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + _ = logging.SetLogLevel("storageminer", "INFO") + + var ( + client kit.TestFullNode + miner kit.TestMiner + wiw, wdw kit.TestWorker + ) + ens := kit.NewEnsemble(t, kit.LatestActorsAt(-1)). + FullNode(&client, kit.ThroughRPC()). + Miner(&miner, &client, kit.WithAllSubsystems(), kit.ThroughRPC(), kit.PresealSectors(2), kit.NoStorage()). + Worker(&miner, &wiw, kit.ThroughRPC(), kit.NoStorage(), kit.WithTaskTypes([]sealtasks.TaskType{sealtasks.TTGenerateWinningPoSt})). + Worker(&miner, &wdw, kit.ThroughRPC(), kit.NoStorage(), kit.WithTaskTypes([]sealtasks.TaskType{sealtasks.TTGenerateWindowPoSt})). + Start() + + ens.InterconnectAll() + + // check there's only one path + sps, err := miner.StorageList(ctx) + require.NoError(t, err) + require.Len(t, sps, 1) + + var id storiface.ID + for s := range sps { + id = s + } + + local, err := miner.StorageLocal(ctx) + require.NoError(t, err) + require.Len(t, local, 1) + require.Greater(t, len(local[id]), 1) + + oldLocal := local[id] + + // check sectors + checkSectors(ctx, t, client, miner, 2, 0) + + // detach preseal path + require.NoError(t, miner.StorageDetachLocal(ctx, local[id])) + + // check that there are no paths, post checks fail + sps, err = miner.StorageList(ctx) + require.NoError(t, err) + require.Len(t, sps, 0) + local, err = miner.StorageLocal(ctx) + require.NoError(t, err) + require.Len(t, local, 0) + + checkSectors(ctx, t, client, miner, 2, 2) + + // attach a new path + newId := miner.AddStorage(ctx, t, func(cfg *paths.LocalStorageMeta) { + cfg.CanStore = true + }) + + sps, err = miner.StorageList(ctx) + require.NoError(t, err) + require.Len(t, sps, 1) + local, err = miner.StorageLocal(ctx) + require.NoError(t, err) + require.Len(t, local, 1) + require.Greater(t, len(local[newId]), 1) + + newLocal := local[newId] + + // move sector data to the new path + + // note: dest path already exist so we only want to .Join src + require.NoError(t, exec.Command("cp", "--recursive", filepath.Join(oldLocal, "sealed"), newLocal).Run()) + require.NoError(t, exec.Command("cp", "--recursive", filepath.Join(oldLocal, "cache"), newLocal).Run()) + require.NoError(t, exec.Command("cp", "--recursive", filepath.Join(oldLocal, "unsealed"), newLocal).Run()) + + // check that sector files aren't indexed, post checks fail + sps, err = miner.StorageList(ctx) + require.NoError(t, err) + require.Len(t, sps, 1) + require.Len(t, sps[newId], 0) + + // redeclare sectors + require.NoError(t, miner.StorageRedeclareLocal(ctx, nil, false)) + + // check that sector files exist, post checks work + sps, err = miner.StorageList(ctx) + require.NoError(t, err) + require.Len(t, sps, 1) + require.Len(t, sps[newId], 2) + + checkSectors(ctx, t, client, miner, 2, 0) + + // remove one sector, one post check fails + require.NoError(t, os.RemoveAll(filepath.Join(newLocal, "sealed", "s-t01000-0"))) + require.NoError(t, os.RemoveAll(filepath.Join(newLocal, "cache", "s-t01000-0"))) + checkSectors(ctx, t, client, miner, 2, 1) + + // redeclare with no drop, still see sector in the index + require.NoError(t, miner.StorageRedeclareLocal(ctx, nil, false)) + + sps, err = miner.StorageList(ctx) + require.NoError(t, err) + require.Len(t, sps, 1) + require.Len(t, sps[newId], 2) + + // redeclare with drop, don't see the sector in the index + require.NoError(t, miner.StorageRedeclareLocal(ctx, nil, true)) + + sps, err = miner.StorageList(ctx) + require.NoError(t, err) + require.Len(t, sps, 1) + require.Len(t, sps[newId], 1) + require.Equal(t, abi.SectorNumber(1), sps[newId][0].SectorID.Number) +} + +func TestPathDetachRedeclareWorker(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + _ = logging.SetLogLevel("storageminer", "INFO") + + var ( + client kit.TestFullNode + miner kit.TestMiner + wiw, wdw, sealw kit.TestWorker + ) + ens := kit.NewEnsemble(t, kit.LatestActorsAt(-1)). + FullNode(&client, kit.ThroughRPC()). + Miner(&miner, &client, kit.WithAllSubsystems(), kit.ThroughRPC(), kit.PresealSectors(2), kit.NoStorage()). + Worker(&miner, &wiw, kit.ThroughRPC(), kit.NoStorage(), kit.WithTaskTypes([]sealtasks.TaskType{sealtasks.TTGenerateWinningPoSt})). + Worker(&miner, &wdw, kit.ThroughRPC(), kit.NoStorage(), kit.WithTaskTypes([]sealtasks.TaskType{sealtasks.TTGenerateWindowPoSt})). + Worker(&miner, &sealw, kit.ThroughRPC(), kit.NoStorage(), kit.WithSealWorkerTasks). + Start() + + ens.InterconnectAll() + + // check there's only one path on the miner, none on the worker + sps, err := miner.StorageList(ctx) + require.NoError(t, err) + require.Len(t, sps, 1) + + var id storiface.ID + for s := range sps { + id = s + } + + local, err := miner.StorageLocal(ctx) + require.NoError(t, err) + require.Len(t, local, 1) + require.Greater(t, len(local[id]), 1) + + oldLocal := local[id] + + local, err = sealw.StorageLocal(ctx) + require.NoError(t, err) + require.Len(t, local, 0) + + // check sectors + checkSectors(ctx, t, client, miner, 2, 0) + + // detach preseal path from the miner + require.NoError(t, miner.StorageDetachLocal(ctx, oldLocal)) + + // check that there are no paths, post checks fail + sps, err = miner.StorageList(ctx) + require.NoError(t, err) + require.Len(t, sps, 0) + local, err = miner.StorageLocal(ctx) + require.NoError(t, err) + require.Len(t, local, 0) + + checkSectors(ctx, t, client, miner, 2, 2) + + // attach a new path + newId := sealw.AddStorage(ctx, t, func(cfg *paths.LocalStorageMeta) { + cfg.CanStore = true + }) + + sps, err = miner.StorageList(ctx) + require.NoError(t, err) + require.Len(t, sps, 1) + local, err = sealw.StorageLocal(ctx) + require.NoError(t, err) + require.Len(t, local, 1) + require.Greater(t, len(local[newId]), 1) + + newLocalTemp := local[newId] + + // move sector data to the new path + + // note: dest path already exist so we only want to .Join src + require.NoError(t, exec.Command("cp", "--recursive", filepath.Join(oldLocal, "sealed"), newLocalTemp).Run()) + require.NoError(t, exec.Command("cp", "--recursive", filepath.Join(oldLocal, "cache"), newLocalTemp).Run()) + require.NoError(t, exec.Command("cp", "--recursive", filepath.Join(oldLocal, "unsealed"), newLocalTemp).Run()) + + // check that sector files aren't indexed, post checks fail + sps, err = miner.StorageList(ctx) + require.NoError(t, err) + require.Len(t, sps, 1) + require.Len(t, sps[newId], 0) + + // redeclare sectors + require.NoError(t, sealw.StorageRedeclareLocal(ctx, nil, false)) + + // check that sector files exist, post checks work + sps, err = miner.StorageList(ctx) + require.NoError(t, err) + require.Len(t, sps, 1) + require.Len(t, sps[newId], 2) + + checkSectors(ctx, t, client, miner, 2, 0) + + // drop the path from the worker + require.NoError(t, sealw.StorageDetachLocal(ctx, newLocalTemp)) + local, err = sealw.StorageLocal(ctx) + require.NoError(t, err) + require.Len(t, local, 0) + + // add a new one again, and move the sectors there + newId = sealw.AddStorage(ctx, t, func(cfg *paths.LocalStorageMeta) { + cfg.CanStore = true + }) + + sps, err = miner.StorageList(ctx) + require.NoError(t, err) + require.Len(t, sps, 1) + local, err = sealw.StorageLocal(ctx) + require.NoError(t, err) + require.Len(t, local, 1) + require.Greater(t, len(local[newId]), 1) + + newLocal := local[newId] + + // move sector data to the new path + + // note: dest path already exist so we only want to .Join src + require.NoError(t, exec.Command("cp", "--recursive", filepath.Join(newLocalTemp, "sealed"), newLocal).Run()) + require.NoError(t, exec.Command("cp", "--recursive", filepath.Join(newLocalTemp, "cache"), newLocal).Run()) + require.NoError(t, exec.Command("cp", "--recursive", filepath.Join(newLocalTemp, "unsealed"), newLocal).Run()) + + // redeclare sectors + require.NoError(t, sealw.StorageRedeclareLocal(ctx, nil, false)) + + // check that sector files exist, post checks work + sps, err = miner.StorageList(ctx) + require.NoError(t, err) + require.Len(t, sps, 1) + require.Len(t, sps[newId], 2) + + checkSectors(ctx, t, client, miner, 2, 0) + + // remove one sector, one check fails + require.NoError(t, os.RemoveAll(filepath.Join(newLocal, "sealed", "s-t01000-0"))) + require.NoError(t, os.RemoveAll(filepath.Join(newLocal, "cache", "s-t01000-0"))) + checkSectors(ctx, t, client, miner, 2, 1) + + // redeclare with no drop, still see sector in the index + require.NoError(t, sealw.StorageRedeclareLocal(ctx, nil, false)) + + sps, err = miner.StorageList(ctx) + require.NoError(t, err) + require.Len(t, sps, 1) + require.Len(t, sps[newId], 2) + + // redeclare with drop, don't see the sector in the index + require.NoError(t, sealw.StorageRedeclareLocal(ctx, nil, true)) + + sps, err = miner.StorageList(ctx) + require.NoError(t, err) + require.Len(t, sps, 1) + require.Len(t, sps[newId], 1) + require.Equal(t, abi.SectorNumber(1), sps[newId][0].SectorID.Number) +} + +func TestPathDetachShared(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + _ = logging.SetLogLevel("storageminer", "INFO") + + var ( + client kit.TestFullNode + miner kit.TestMiner + wiw, wdw, sealw kit.TestWorker + ) + ens := kit.NewEnsemble(t, kit.LatestActorsAt(-1)). + FullNode(&client, kit.ThroughRPC()). + Miner(&miner, &client, kit.WithAllSubsystems(), kit.ThroughRPC(), kit.PresealSectors(2), kit.NoStorage()). + Worker(&miner, &wiw, kit.ThroughRPC(), kit.NoStorage(), kit.WithTaskTypes([]sealtasks.TaskType{sealtasks.TTGenerateWinningPoSt})). + Worker(&miner, &wdw, kit.ThroughRPC(), kit.NoStorage(), kit.WithTaskTypes([]sealtasks.TaskType{sealtasks.TTGenerateWindowPoSt})). + Worker(&miner, &sealw, kit.ThroughRPC(), kit.NoStorage(), kit.WithSealWorkerTasks). + Start() + + ens.InterconnectAll() + + // check there's only one path on the miner, none on the worker + sps, err := miner.StorageList(ctx) + require.NoError(t, err) + require.Len(t, sps, 1) + + var id storiface.ID + for s := range sps { + id = s + } + + // check that there's only one URL for the path (provided by the miner node) + si, err := miner.StorageInfo(ctx, id) + require.NoError(t, err) + require.Len(t, si.URLs, 1) + + local, err := miner.StorageLocal(ctx) + require.NoError(t, err) + require.Len(t, local, 1) + require.Greater(t, len(local[id]), 1) + + minerLocal := local[id] + + local, err = sealw.StorageLocal(ctx) + require.NoError(t, err) + require.Len(t, local, 0) + + // share the genesis sector path with the worker + require.NoError(t, sealw.StorageAddLocal(ctx, minerLocal)) + + // still just one path, but accessible from two places + sps, err = miner.StorageList(ctx) + require.NoError(t, err) + require.Len(t, sps, 1) + + // should see 2 urls now + si, err = miner.StorageInfo(ctx, id) + require.NoError(t, err) + require.Len(t, si.URLs, 2) + + // drop the path from the worker + require.NoError(t, sealw.StorageDetachLocal(ctx, minerLocal)) + + // the path is still registered + sps, err = miner.StorageList(ctx) + require.NoError(t, err) + require.Len(t, sps, 1) + + // but with just one URL (the miner) + si, err = miner.StorageInfo(ctx, id) + require.NoError(t, err) + require.Len(t, si.URLs, 1) + + // now also drop from the minel and check that the path is gone + require.NoError(t, miner.StorageDetachLocal(ctx, minerLocal)) + + sps, err = miner.StorageList(ctx) + require.NoError(t, err) + require.Len(t, sps, 0) +} + +func checkSectors(ctx context.Context, t *testing.T, api kit.TestFullNode, miner kit.TestMiner, expectChecked, expectBad int) { + addr, err := miner.ActorAddress(ctx) + require.NoError(t, err) + + mid, err := address.IDFromAddress(addr) + require.NoError(t, err) + + info, err := api.StateMinerInfo(ctx, addr, types.EmptyTSK) + require.NoError(t, err) + + partitions, err := api.StateMinerPartitions(ctx, addr, 0, types.EmptyTSK) + require.NoError(t, err) + par := partitions[0] + + sectorInfos, err := api.StateMinerSectors(ctx, addr, &par.LiveSectors, types.EmptyTSK) + require.NoError(t, err) + + var tocheck []storiface.SectorRef + for _, info := range sectorInfos { + si := abi.SectorID{ + Miner: abi.ActorID(mid), + Number: info.SectorNumber, + } + + tocheck = append(tocheck, storiface.SectorRef{ + ProofType: info.SealProof, + ID: si, + }) + } + + require.Len(t, tocheck, expectChecked) + + bad, err := miner.CheckProvable(ctx, info.WindowPoStProofType, tocheck, true) + require.NoError(t, err) + require.Len(t, bad, expectBad) +} diff --git a/itests/sdr_upgrade_test.go b/itests/sdr_upgrade_test.go index a90d0bebed9..493dc12241c 100644 --- a/itests/sdr_upgrade_test.go +++ b/itests/sdr_upgrade_test.go @@ -95,7 +95,7 @@ func TestSDRUpgrade(t *testing.T) { // before. miner.PledgeSectors(ctx, 9, 0, pledge) - s, err := miner.SectorsList(ctx) + s, err := miner.SectorsListNonGenesis(ctx) require.NoError(t, err) sort.Slice(s, func(i, j int) bool { return s[i] < s[j] diff --git a/itests/sector_make_cc_avail_test.go b/itests/sector_make_cc_avail_test.go index 437e803b053..c6ed4b36d71 100644 --- a/itests/sector_make_cc_avail_test.go +++ b/itests/sector_make_cc_avail_test.go @@ -34,10 +34,10 @@ func TestMakeAvailable(t *testing.T) { t.Fatal(err) } - CCUpgrade := abi.SectorNumber(kit.DefaultPresealsPerBootstrapMiner + 1) + CCUpgrade := abi.SectorNumber(kit.DefaultPresealsPerBootstrapMiner) miner.PledgeSectors(ctx, 1, 0, nil) - sl, err := miner.SectorsList(ctx) + sl, err := miner.SectorsListNonGenesis(ctx) require.NoError(t, err) require.Len(t, sl, 1, "expected 1 sector") require.Equal(t, CCUpgrade, sl[0], "unexpected sector number") @@ -48,7 +48,7 @@ func TestMakeAvailable(t *testing.T) { } client.WaitForSectorActive(ctx, t, CCUpgrade, maddr) - sl, err = miner.SectorsList(ctx) + sl, err = miner.SectorsListNonGenesis(ctx) require.NoError(t, err) require.Len(t, sl, 1, "expected 1 sector") @@ -64,7 +64,7 @@ func TestMakeAvailable(t *testing.T) { outPath := dh.PerformRetrieval(context.Background(), deal, res.Root, false) kit.AssertFilesEqual(t, inPath, outPath) - sl, err = miner.SectorsList(ctx) + sl, err = miner.SectorsListNonGenesis(ctx) require.NoError(t, err) require.Len(t, sl, 1, "expected 1 sector") diff --git a/itests/sector_miner_collateral_test.go b/itests/sector_miner_collateral_test.go index 07dad8edd74..b722dae6538 100644 --- a/itests/sector_miner_collateral_test.go +++ b/itests/sector_miner_collateral_test.go @@ -96,7 +96,7 @@ func TestMinerBalanceCollateral(t *testing.T) { } // check that sector messages had zero value set - sl, err := miner.SectorsList(ctx) + sl, err := miner.SectorsListNonGenesis(ctx) require.NoError(t, err) for _, number := range sl { diff --git a/itests/sector_prefer_no_upgrade_test.go b/itests/sector_prefer_no_upgrade_test.go index 30674b36e38..0294899d9ad 100644 --- a/itests/sector_prefer_no_upgrade_test.go +++ b/itests/sector_prefer_no_upgrade_test.go @@ -34,12 +34,12 @@ func TestPreferNoUpgrade(t *testing.T) { t.Fatal(err) } - CCUpgrade := abi.SectorNumber(kit.DefaultPresealsPerBootstrapMiner + 1) - Sealed := abi.SectorNumber(kit.DefaultPresealsPerBootstrapMiner + 2) + CCUpgrade := abi.SectorNumber(kit.DefaultPresealsPerBootstrapMiner) + Sealed := abi.SectorNumber(kit.DefaultPresealsPerBootstrapMiner + 1) { miner.PledgeSectors(ctx, 1, 0, nil) - sl, err := miner.SectorsList(ctx) + sl, err := miner.SectorsListNonGenesis(ctx) require.NoError(t, err) require.Len(t, sl, 1, "expected 1 sector") require.Equal(t, CCUpgrade, sl[0], "unexpected sector number") @@ -53,7 +53,7 @@ func TestPreferNoUpgrade(t *testing.T) { err = miner.SectorMarkForUpgrade(ctx, sl[0], true) require.NoError(t, err) - sl, err = miner.SectorsList(ctx) + sl, err = miner.SectorsListNonGenesis(ctx) require.NoError(t, err) require.Len(t, sl, 1, "expected 1 sector") } @@ -68,7 +68,7 @@ func TestPreferNoUpgrade(t *testing.T) { kit.AssertFilesEqual(t, inPath, outPath) } - sl, err := miner.SectorsList(ctx) + sl, err := miner.SectorsListNonGenesis(ctx) require.NoError(t, err) require.Len(t, sl, 2, "expected 2 sectors") diff --git a/itests/sector_revert_available_test.go b/itests/sector_revert_available_test.go index 559e8e8b050..99d410e9537 100644 --- a/itests/sector_revert_available_test.go +++ b/itests/sector_revert_available_test.go @@ -31,11 +31,11 @@ func TestAbortUpgradeAvailable(t *testing.T) { t.Fatal(err) } - CCUpgrade := abi.SectorNumber(kit.DefaultPresealsPerBootstrapMiner + 1) + CCUpgrade := abi.SectorNumber(kit.DefaultPresealsPerBootstrapMiner) fmt.Printf("CCUpgrade: %d\n", CCUpgrade) miner.PledgeSectors(ctx, 1, 0, nil) - sl, err := miner.SectorsList(ctx) + sl, err := miner.SectorsListNonGenesis(ctx) require.NoError(t, err) require.Len(t, sl, 1, "expected 1 sector") require.Equal(t, CCUpgrade, sl[0], "unexpected sector number") @@ -49,7 +49,7 @@ func TestAbortUpgradeAvailable(t *testing.T) { err = miner.SectorMarkForUpgrade(ctx, sl[0], true) require.NoError(t, err) - sl, err = miner.SectorsList(ctx) + sl, err = miner.SectorsListNonGenesis(ctx) require.NoError(t, err) require.Len(t, sl, 1, "expected 1 sector") diff --git a/itests/wdpost_dispute_test.go b/itests/wdpost_dispute_test.go index f26adb39b56..0982f44c876 100644 --- a/itests/wdpost_dispute_test.go +++ b/itests/wdpost_dispute_test.go @@ -94,7 +94,7 @@ func TestWindowPostDispute(t *testing.T) { require.Equal(t, p.MinerPower.RawBytePower, types.NewInt(uint64(ssz))) //stm: @MINER_SECTOR_LIST_001 - evilSectors, err := evilMiner.SectorsList(ctx) + evilSectors, err := evilMiner.SectorsListNonGenesis(ctx) require.NoError(t, err) evilSectorNo := evilSectors[0] // only one. //stm: @CHAIN_STATE_SECTOR_PARTITION_001 diff --git a/node/impl/storminer.go b/node/impl/storminer.go index 7f595af08d0..74f4429efc4 100644 --- a/node/impl/storminer.go +++ b/node/impl/storminer.go @@ -1213,6 +1213,22 @@ func (sm *StorageMinerAPI) StorageAddLocal(ctx context.Context, path string) err return sm.StorageMgr.AddLocalStorage(ctx, path) } +func (sm *StorageMinerAPI) StorageDetachLocal(ctx context.Context, path string) error { + if sm.StorageMgr == nil { + return xerrors.Errorf("no storage manager") + } + + return sm.StorageMgr.DetachLocalStorage(ctx, path) +} + +func (sm *StorageMinerAPI) StorageRedeclareLocal(ctx context.Context, id *storiface.ID, dropMissing bool) error { + if sm.StorageMgr == nil { + return xerrors.Errorf("no storage manager") + } + + return sm.StorageMgr.RedeclareLocalStorage(ctx, id, dropMissing) +} + func (sm *StorageMinerAPI) PiecesListPieces(ctx context.Context) ([]cid.Cid, error) { return sm.PieceStore.ListPieceInfoKeys() } diff --git a/storage/paths/index.go b/storage/paths/index.go index 97371a87551..ba387a3f768 100644 --- a/storage/paths/index.go +++ b/storage/paths/index.go @@ -30,6 +30,7 @@ var SkippedHeartbeatThresh = HeartbeatInterval * 5 type SectorIndex interface { // part of storage-miner api StorageAttach(context.Context, storiface.StorageInfo, fsutil.FsStat) error + StorageDetach(ctx context.Context, id storiface.ID, url string) error StorageInfo(context.Context, storiface.ID) (storiface.StorageInfo, error) StorageReportHealth(context.Context, storiface.ID, storiface.HealthReport) error @@ -217,6 +218,94 @@ func (i *Index) StorageAttach(ctx context.Context, si storiface.StorageInfo, st return nil } +func (i *Index) StorageDetach(ctx context.Context, id storiface.ID, url string) error { + i.lk.Lock() + defer i.lk.Unlock() + + // ent: *storageEntry + ent, ok := i.stores[id] + if !ok { + return xerrors.Errorf("storage '%s' isn't registered", id) + } + + // check if this is the only path provider/url for this pathID + drop := true + if len(ent.info.URLs) > 0 { + drop = len(ent.info.URLs) == 1 // only one url + + if drop && ent.info.URLs[0] != url { + return xerrors.Errorf("not dropping path, requested and index urls don't match ('%s' != '%s')", url, ent.info.URLs[0]) + } + } + + if drop { + if a, hasAlert := i.pathAlerts[id]; hasAlert && i.alerting != nil { + if i.alerting.IsRaised(a) { + i.alerting.Resolve(a, map[string]string{ + "message": "path detached", + }) + } + delete(i.pathAlerts, id) + } + + // stats + var droppedEntries, primaryEntries, droppedDecls int + + // drop declarations + for decl, dms := range i.sectors { + var match bool + for _, dm := range dms { + if dm.storage == id { + match = true + droppedEntries++ + if dm.primary { + primaryEntries++ + } + break + } + } + + // if no entries match, nothing to do here + if !match { + continue + } + + // if there's a match, and only one entry, drop the whole declaration + if len(dms) <= 1 { + delete(i.sectors, decl) + droppedDecls++ + continue + } + + // rewrite entries with the path we're dropping filtered out + filtered := make([]*declMeta, 0, len(dms)-1) + for _, dm := range dms { + if dm.storage != id { + filtered = append(filtered, dm) + } + } + + i.sectors[decl] = filtered + } + + delete(i.stores, id) + + log.Warnw("Dropping sector storage", "path", id, "url", url, "droppedEntries", droppedEntries, "droppedPrimaryEntries", primaryEntries, "droppedDecls", droppedDecls) + } else { + newUrls := make([]string, 0, len(ent.info.URLs)) + for _, u := range ent.info.URLs { + if u != url { + newUrls = append(newUrls, u) + } + } + ent.info.URLs = newUrls + + log.Warnw("Dropping sector path endpoint", "path", id, "url", url) + } + + return nil +} + func (i *Index) StorageReportHealth(ctx context.Context, id storiface.ID, report storiface.HealthReport) error { i.lk.Lock() defer i.lk.Unlock() diff --git a/storage/paths/local.go b/storage/paths/local.go index 698da1e7503..552b1738df3 100644 --- a/storage/paths/local.go +++ b/storage/paths/local.go @@ -218,6 +218,10 @@ func (st *Local) OpenPath(ctx context.Context, p string) error { return xerrors.Errorf("unmarshalling storage metadata for %s: %w", p, err) } + if p, exists := st.paths[meta.ID]; exists { + return xerrors.Errorf("path with ID %s already opened: '%s'", meta.ID, p.local) + } + // TODO: Check existing / dedupe out := &path{ @@ -249,7 +253,7 @@ func (st *Local) OpenPath(ctx context.Context, p string) error { return xerrors.Errorf("declaring storage in index: %w", err) } - if err := st.declareSectors(ctx, p, meta.ID, meta.CanStore); err != nil { + if err := st.declareSectors(ctx, p, meta.ID, meta.CanStore, false); err != nil { return err } @@ -258,6 +262,25 @@ func (st *Local) OpenPath(ctx context.Context, p string) error { return nil } +func (st *Local) ClosePath(ctx context.Context, id storiface.ID) error { + st.localLk.Lock() + defer st.localLk.Unlock() + + if _, exists := st.paths[id]; !exists { + return xerrors.Errorf("path with ID %s isn't opened", id) + } + + for _, url := range st.urls { + if err := st.index.StorageDetach(ctx, id, url); err != nil { + return xerrors.Errorf("dropping path (id='%s' url='%s'): %w", id, url, err) + } + } + + delete(st.paths, id) + + return nil +} + func (st *Local) open(ctx context.Context) error { cfg, err := st.localStorage.GetStorage() if err != nil { @@ -276,7 +299,7 @@ func (st *Local) open(ctx context.Context) error { return nil } -func (st *Local) Redeclare(ctx context.Context) error { +func (st *Local) Redeclare(ctx context.Context, filterId *storiface.ID, dropMissingDecls bool) error { st.localLk.Lock() defer st.localLk.Unlock() @@ -300,6 +323,9 @@ func (st *Local) Redeclare(ctx context.Context) error { log.Errorf("storage path ID changed: %s; %s -> %s", p.local, id, meta.ID) continue } + if filterId != nil && *filterId != id { + continue + } err = st.index.StorageAttach(ctx, storiface.StorageInfo{ ID: id, @@ -317,7 +343,7 @@ func (st *Local) Redeclare(ctx context.Context) error { return xerrors.Errorf("redeclaring storage in index: %w", err) } - if err := st.declareSectors(ctx, p.local, meta.ID, meta.CanStore); err != nil { + if err := st.declareSectors(ctx, p.local, meta.ID, meta.CanStore, dropMissingDecls); err != nil { return xerrors.Errorf("redeclaring sectors: %w", err) } } @@ -325,7 +351,24 @@ func (st *Local) Redeclare(ctx context.Context) error { return nil } -func (st *Local) declareSectors(ctx context.Context, p string, id storiface.ID, primary bool) error { +func (st *Local) declareSectors(ctx context.Context, p string, id storiface.ID, primary, dropMissing bool) error { + indexed := map[storiface.Decl]struct{}{} + if dropMissing { + decls, err := st.index.StorageList(ctx) + if err != nil { + return xerrors.Errorf("getting declaration list: %w", err) + } + + for _, decl := range decls[id] { + for _, fileType := range decl.SectorFileType.AllSet() { + indexed[storiface.Decl{ + SectorID: decl.SectorID, + SectorFileType: fileType, + }] = struct{}{} + } + } + } + for _, t := range storiface.PathTypes { ents, err := ioutil.ReadDir(filepath.Join(p, t.String())) if err != nil { @@ -349,12 +392,29 @@ func (st *Local) declareSectors(ctx context.Context, p string, id storiface.ID, return xerrors.Errorf("parse sector id %s: %w", ent.Name(), err) } + delete(indexed, storiface.Decl{ + SectorID: sid, + SectorFileType: t, + }) + if err := st.index.StorageDeclareSector(ctx, id, sid, t, primary); err != nil { return xerrors.Errorf("declare sector %d(t:%d) -> %s: %w", sid, t, id, err) } } } + if len(indexed) > 0 { + log.Warnw("index contains sectors which are missing in the storage path", "count", len(indexed), "dropMissing", dropMissing) + } + + if dropMissing { + for decl := range indexed { + if err := st.index.StorageDropSector(ctx, id, decl.SectorID, decl.SectorFileType); err != nil { + return xerrors.Errorf("dropping sector %v from index: %w", decl, err) + } + } + } + return nil } diff --git a/storage/paths/mocks/index.go b/storage/paths/mocks/index.go index 030692b8f5f..6fdcb03b9ec 100644 --- a/storage/paths/mocks/index.go +++ b/storage/paths/mocks/index.go @@ -82,6 +82,20 @@ func (mr *MockSectorIndexMockRecorder) StorageDeclareSector(arg0, arg1, arg2, ar return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "StorageDeclareSector", reflect.TypeOf((*MockSectorIndex)(nil).StorageDeclareSector), arg0, arg1, arg2, arg3, arg4) } +// StorageDetach mocks base method. +func (m *MockSectorIndex) StorageDetach(arg0 context.Context, arg1 storiface.ID, arg2 string) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "StorageDetach", arg0, arg1, arg2) + ret0, _ := ret[0].(error) + return ret0 +} + +// StorageDetach indicates an expected call of StorageDetach. +func (mr *MockSectorIndexMockRecorder) StorageDetach(arg0, arg1, arg2 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "StorageDetach", reflect.TypeOf((*MockSectorIndex)(nil).StorageDetach), arg0, arg1, arg2) +} + // StorageDropSector mocks base method. func (m *MockSectorIndex) StorageDropSector(arg0 context.Context, arg1 storiface.ID, arg2 abi.SectorID, arg3 storiface.SectorFileType) error { m.ctrl.T.Helper() diff --git a/storage/sealer/manager.go b/storage/sealer/manager.go index 68eae077cd7..6733674ffa0 100644 --- a/storage/sealer/manager.go +++ b/storage/sealer/manager.go @@ -238,6 +238,58 @@ func (m *Manager) AddLocalStorage(ctx context.Context, path string) error { return nil } +func (m *Manager) DetachLocalStorage(ctx context.Context, path string) error { + path, err := homedir.Expand(path) + if err != nil { + return xerrors.Errorf("expanding local path: %w", err) + } + + // check that we have the path opened + lps, err := m.localStore.Local(ctx) + if err != nil { + return xerrors.Errorf("getting local path list: %w", err) + } + + var localPath *storiface.StoragePath + for _, lp := range lps { + if lp.LocalPath == path { + lp := lp // copy to make the linter happy + localPath = &lp + break + } + } + if localPath == nil { + return xerrors.Errorf("no local paths match '%s'", path) + } + + // drop from the persisted storage.json + var found bool + if err := m.ls.SetStorage(func(sc *paths.StorageConfig) { + out := make([]paths.LocalPath, 0, len(sc.StoragePaths)) + for _, storagePath := range sc.StoragePaths { + if storagePath.Path != path { + out = append(out, storagePath) + continue + } + found = true + } + sc.StoragePaths = out + }); err != nil { + return xerrors.Errorf("set storage config: %w", err) + } + if !found { + // maybe this is fine? + return xerrors.Errorf("path not found in storage.json") + } + + // unregister locally, drop from sector index + return m.localStore.ClosePath(ctx, localPath.ID) +} + +func (m *Manager) RedeclareLocalStorage(ctx context.Context, id *storiface.ID, dropMissing bool) error { + return m.localStore.Redeclare(ctx, id, dropMissing) +} + func (m *Manager) AddWorker(ctx context.Context, w Worker) error { sessID, err := w.Session(ctx) if err != nil { diff --git a/storage/sealer/storiface/filetype.go b/storage/sealer/storiface/filetype.go index 5aca9f4db7c..4660dd2a7fc 100644 --- a/storage/sealer/storiface/filetype.go +++ b/storage/sealer/storiface/filetype.go @@ -99,6 +99,18 @@ func (t SectorFileType) Strings() []string { return out } +func (t SectorFileType) AllSet() []SectorFileType { + var out []SectorFileType + for _, fileType := range PathTypes { + if fileType&t == 0 { + continue + } + + out = append(out, fileType) + } + return out +} + func (t SectorFileType) Has(singleType SectorFileType) bool { return t&singleType == singleType }