From bd71bf2379287fc2403b32622d362b2da5dc251d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Wed, 11 Nov 2020 16:47:44 +0100 Subject: [PATCH 1/5] Expand sched-diag --- extern/sector-storage/manager.go | 44 +++++++++++++++++++++++++++++++- 1 file changed, 43 insertions(+), 1 deletion(-) diff --git a/extern/sector-storage/manager.go b/extern/sector-storage/manager.go index 5e86cb4d0eb..5ba90c46041 100644 --- a/extern/sector-storage/manager.go +++ b/extern/sector-storage/manager.go @@ -688,7 +688,49 @@ func (m *Manager) SchedDiag(ctx context.Context, doSched bool) (interface{}, err } } - return m.sched.Info(ctx) + si, err := m.sched.Info(ctx) + if err != nil { + return nil, err + } + + type SchedInfo interface{} + i := struct{ + SchedInfo + + ReturnedWork []string + Waiting []string + + CallToWork map[string]string + + EarlyRet []string + + }{ + SchedInfo: si, + + CallToWork: map[string]string{}, + } + + m.workLk.Lock() + + for w := range m.results { + i.ReturnedWork = append(i.ReturnedWork, w.String()) + } + + for id := range m.callRes { + i.EarlyRet = append(i.EarlyRet, id.String()) + } + + for w := range m.waitRes { + i.Waiting = append(i.Waiting, w.String()) + } + + for c, w := range m.callToWork { + i.CallToWork[c.String()] = w.String() + } + + m.workLk.Unlock() + + return i, nil } func (m *Manager) Close(ctx context.Context) error { From 2a3d9309335ce99e12d8185902fbb97435a06a54 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Wed, 11 Nov 2020 16:48:04 +0100 Subject: [PATCH 2/5] Hide ret-done tasks in sealing jobs by default --- cmd/lotus-storage-miner/sealing.go | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/cmd/lotus-storage-miner/sealing.go b/cmd/lotus-storage-miner/sealing.go index faaab7b01f9..17d33958909 100644 --- a/cmd/lotus-storage-miner/sealing.go +++ b/cmd/lotus-storage-miner/sealing.go @@ -127,6 +127,10 @@ var sealingJobsCmd = &cli.Command{ Usage: "list workers", Flags: []cli.Flag{ &cli.BoolFlag{Name: "color"}, + &cli.BoolFlag{ + Name: "show-ret-done", + Usage: "show returned but not consumed calls", + }, }, Action: func(cctx *cli.Context) error { color.NoColor = !cctx.Bool("color") @@ -191,6 +195,9 @@ var sealingJobsCmd = &cli.Command{ case l.RunWait > 0: state = fmt.Sprintf("assigned(%d)", l.RunWait-1) case l.RunWait == storiface.RWRetDone: + if !cctx.Bool("show-ret-done") { + continue + } state = "ret-done" case l.RunWait == storiface.RWReturned: state = "returned" From 09f9f871a37649688277ae1fbcd0be136d07af0f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Wed, 11 Nov 2020 17:39:12 +0100 Subject: [PATCH 3/5] Create a command to abort sealing calls --- api/api_storage.go | 1 + api/apistruct/struct.go | 5 ++ cmd/lotus-storage-miner/sealing.go | 50 ++++++++++++++++++-- extern/sector-storage/manager.go | 24 +++++++--- extern/sector-storage/manager_calltracker.go | 4 ++ node/impl/storminer.go | 4 ++ 6 files changed, 79 insertions(+), 9 deletions(-) diff --git a/api/api_storage.go b/api/api_storage.go index d003ec776b6..738a05e095d 100644 --- a/api/api_storage.go +++ b/api/api_storage.go @@ -71,6 +71,7 @@ type StorageMiner interface { // SealingSchedDiag dumps internal sealing scheduler state SealingSchedDiag(ctx context.Context, doSched bool) (interface{}, error) + SealingAbort(ctx context.Context, call storiface.CallID) error stores.SectorIndex diff --git a/api/apistruct/struct.go b/api/apistruct/struct.go index 8db09af7102..16e93be5eaf 100644 --- a/api/apistruct/struct.go +++ b/api/apistruct/struct.go @@ -324,6 +324,7 @@ type StorageMinerStruct struct { ReturnFetch func(ctx context.Context, callID storiface.CallID, err string) error `perm:"admin" retry:"true"` SealingSchedDiag func(context.Context, bool) (interface{}, error) `perm:"admin"` + SealingAbort func(ctx context.Context, call storiface.CallID) error `perm:"admin"` StorageList func(context.Context) (map[stores.ID][]stores.Decl, error) `perm:"admin"` StorageLocal func(context.Context) (map[stores.ID]string, error) `perm:"admin"` @@ -1318,6 +1319,10 @@ func (c *StorageMinerStruct) SealingSchedDiag(ctx context.Context, doSched bool) return c.Internal.SealingSchedDiag(ctx, doSched) } +func (c *StorageMinerStruct) SealingAbort(ctx context.Context, call storiface.CallID) error { + return c.Internal.SealingAbort(ctx, call) +} + func (c *StorageMinerStruct) StorageAttach(ctx context.Context, si stores.StorageInfo, st fsutil.FsStat) error { return c.Internal.StorageAttach(ctx, si, st) } diff --git a/cmd/lotus-storage-miner/sealing.go b/cmd/lotus-storage-miner/sealing.go index 17d33958909..6d972a11cae 100644 --- a/cmd/lotus-storage-miner/sealing.go +++ b/cmd/lotus-storage-miner/sealing.go @@ -28,6 +28,7 @@ var sealingCmd = &cli.Command{ sealingJobsCmd, sealingWorkersCmd, sealingSchedDiagCmd, + sealingAbortCmd, }, } @@ -124,7 +125,7 @@ var sealingWorkersCmd = &cli.Command{ var sealingJobsCmd = &cli.Command{ Name: "jobs", - Usage: "list workers", + Usage: "list running jobs", Flags: []cli.Flag{ &cli.BoolFlag{Name: "color"}, &cli.BoolFlag{ @@ -215,9 +216,9 @@ var sealingJobsCmd = &cli.Command{ } _, _ = fmt.Fprintf(tw, "%s\t%d\t%s\t%s\t%s\t%s\t%s\n", - hex.EncodeToString(l.ID.ID[10:]), + hex.EncodeToString(l.ID.ID[:4]), l.Sector.Number, - hex.EncodeToString(l.wid[5:]), + hex.EncodeToString(l.wid[:4]), hostname, l.Task.Short(), state, @@ -260,3 +261,46 @@ var sealingSchedDiagCmd = &cli.Command{ return nil }, } + +var sealingAbortCmd = &cli.Command{ + Name: "abort", + Usage: "Abort a running job", + ArgsUsage: "[call id]", + Action: func(cctx *cli.Context) error { + if cctx.Args().Len() != 1 { + return xerrors.Errorf("expected 1 argument") + } + + nodeApi, closer, err := lcli.GetStorageMinerAPI(cctx) + if err != nil { + return err + } + defer closer() + + ctx := lcli.ReqContext(cctx) + + jobs, err := nodeApi.WorkerJobs(ctx) + if err != nil { + return xerrors.Errorf("getting worker jobs: %w", err) + } + + var job *storiface.WorkerJob + outer: + for _, workerJobs := range jobs { + for _, j := range workerJobs { + if strings.HasPrefix(j.ID.ID.String(), cctx.Args().First()) { + job = &j + break outer + } + } + } + + if job == nil { + return xerrors.Errorf("job with specified id prefix not found") + } + + fmt.Printf("aborting job %s, task %s, sector %d, running on host %s\n", job.ID.String(), job.Task.Short(), job.Sector.Number, job.Hostname) + + return nodeApi.SealingAbort(ctx, job.ID) + }, +} diff --git a/extern/sector-storage/manager.go b/extern/sector-storage/manager.go index 5ba90c46041..8e7d8c4e8aa 100644 --- a/extern/sector-storage/manager.go +++ b/extern/sector-storage/manager.go @@ -220,7 +220,9 @@ func (m *Manager) readPiece(sink io.Writer, sector abi.SectorID, offset storifac if err != nil { return err } - *rok = r.(bool) + if r != nil { + *rok = r.(bool) + } return nil } } @@ -342,7 +344,9 @@ func (m *Manager) AddPiece(ctx context.Context, sector abi.SectorID, existingPie if err != nil { return err } - out = p.(abi.PieceInfo) + if p != nil { + out = p.(abi.PieceInfo) + } return nil }) @@ -366,7 +370,9 @@ func (m *Manager) SealPreCommit1(ctx context.Context, sector abi.SectorID, ticke waitErr = werr return } - out = p.(storage.PreCommit1Out) + if p != nil { + out = p.(storage.PreCommit1Out) + } } if wait { // already in progress @@ -415,7 +421,9 @@ func (m *Manager) SealPreCommit2(ctx context.Context, sector abi.SectorID, phase waitErr = werr return } - out = p.(storage.SectorCids) + if p != nil { + out = p.(storage.SectorCids) + } } if wait { // already in progress @@ -462,7 +470,9 @@ func (m *Manager) SealCommit1(ctx context.Context, sector abi.SectorID, ticket a waitErr = werr return } - out = p.(storage.Commit1Out) + if p != nil { + out = p.(storage.Commit1Out) + } } if wait { // already in progress @@ -509,7 +519,9 @@ func (m *Manager) SealCommit2(ctx context.Context, sector abi.SectorID, phase1Ou waitErr = werr return } - out = p.(storage.Proof) + if p != nil { + out = p.(storage.Proof) + } } if wait { // already in progress diff --git a/extern/sector-storage/manager_calltracker.go b/extern/sector-storage/manager_calltracker.go index e6182adb0f9..9b39a5a7046 100644 --- a/extern/sector-storage/manager_calltracker.go +++ b/extern/sector-storage/manager_calltracker.go @@ -414,3 +414,7 @@ func (m *Manager) returnResult(callID storiface.CallID, r interface{}, serr stri return nil } + +func (m *Manager) Abort(ctx context.Context, call storiface.CallID) error { + return m.returnResult(call, nil, "task aborted") +} diff --git a/node/impl/storminer.go b/node/impl/storminer.go index a58621c97f5..b90734e3c7b 100644 --- a/node/impl/storminer.go +++ b/node/impl/storminer.go @@ -300,6 +300,10 @@ func (sm *StorageMinerAPI) SealingSchedDiag(ctx context.Context, doSched bool) ( return sm.StorageMgr.SchedDiag(ctx, doSched) } +func (sm *StorageMinerAPI) SealingAbort(ctx context.Context, call storiface.CallID) error { + return sm.StorageMgr.Abort(ctx, call) +} + func (sm *StorageMinerAPI) MarketImportDealData(ctx context.Context, propCid cid.Cid, path string) error { fi, err := os.Open(path) if err != nil { From 8ac495723e9bd716c9edf7c288233c3e89b6d84d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Wed, 11 Nov 2020 17:39:31 +0100 Subject: [PATCH 4/5] gofmt --- api/apistruct/struct.go | 4 ++-- cmd/lotus-storage-miner/sealing.go | 7 ++++--- extern/sector-storage/manager.go | 7 +++---- 3 files changed, 9 insertions(+), 9 deletions(-) diff --git a/api/apistruct/struct.go b/api/apistruct/struct.go index 16e93be5eaf..08b3ba9a934 100644 --- a/api/apistruct/struct.go +++ b/api/apistruct/struct.go @@ -323,8 +323,8 @@ type StorageMinerStruct struct { ReturnReadPiece func(ctx context.Context, callID storiface.CallID, ok bool, err string) error `perm:"admin" retry:"true"` ReturnFetch func(ctx context.Context, callID storiface.CallID, err string) error `perm:"admin" retry:"true"` - SealingSchedDiag func(context.Context, bool) (interface{}, error) `perm:"admin"` - SealingAbort func(ctx context.Context, call storiface.CallID) error `perm:"admin"` + SealingSchedDiag func(context.Context, bool) (interface{}, error) `perm:"admin"` + SealingAbort func(ctx context.Context, call storiface.CallID) error `perm:"admin"` StorageList func(context.Context) (map[stores.ID][]stores.Decl, error) `perm:"admin"` StorageLocal func(context.Context) (map[stores.ID]string, error) `perm:"admin"` diff --git a/cmd/lotus-storage-miner/sealing.go b/cmd/lotus-storage-miner/sealing.go index 6d972a11cae..89af1ba9d4a 100644 --- a/cmd/lotus-storage-miner/sealing.go +++ b/cmd/lotus-storage-miner/sealing.go @@ -129,7 +129,7 @@ var sealingJobsCmd = &cli.Command{ Flags: []cli.Flag{ &cli.BoolFlag{Name: "color"}, &cli.BoolFlag{ - Name: "show-ret-done", + Name: "show-ret-done", Usage: "show returned but not consumed calls", }, }, @@ -263,8 +263,8 @@ var sealingSchedDiagCmd = &cli.Command{ } var sealingAbortCmd = &cli.Command{ - Name: "abort", - Usage: "Abort a running job", + Name: "abort", + Usage: "Abort a running job", ArgsUsage: "[call id]", Action: func(cctx *cli.Context) error { if cctx.Args().Len() != 1 { @@ -289,6 +289,7 @@ var sealingAbortCmd = &cli.Command{ for _, workerJobs := range jobs { for _, j := range workerJobs { if strings.HasPrefix(j.ID.ID.String(), cctx.Args().First()) { + j := j job = &j break outer } diff --git a/extern/sector-storage/manager.go b/extern/sector-storage/manager.go index 8e7d8c4e8aa..a3d6a413104 100644 --- a/extern/sector-storage/manager.go +++ b/extern/sector-storage/manager.go @@ -700,22 +700,21 @@ func (m *Manager) SchedDiag(ctx context.Context, doSched bool) (interface{}, err } } - si, err := m.sched.Info(ctx) + si, err := m.sched.Info(ctx) if err != nil { return nil, err } type SchedInfo interface{} - i := struct{ + i := struct { SchedInfo ReturnedWork []string - Waiting []string + Waiting []string CallToWork map[string]string EarlyRet []string - }{ SchedInfo: si, From 498477da04a0d74dc625e051cf237edd60d83710 Mon Sep 17 00:00:00 2001 From: Aayush Rajasekaran Date: Thu, 12 Nov 2020 02:04:55 -0500 Subject: [PATCH 5/5] Nit: ArgsUsage param rename --- cmd/lotus-storage-miner/sealing.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/lotus-storage-miner/sealing.go b/cmd/lotus-storage-miner/sealing.go index 89af1ba9d4a..ad890129d0b 100644 --- a/cmd/lotus-storage-miner/sealing.go +++ b/cmd/lotus-storage-miner/sealing.go @@ -265,7 +265,7 @@ var sealingSchedDiagCmd = &cli.Command{ var sealingAbortCmd = &cli.Command{ Name: "abort", Usage: "Abort a running job", - ArgsUsage: "[call id]", + ArgsUsage: "[callid]", Action: func(cctx *cli.Context) error { if cctx.Args().Len() != 1 { return xerrors.Errorf("expected 1 argument")