Skip to content

Commit

Permalink
Merge branch 'feat/SchedRemoveRequest' of github.com:LexLuthr/lotus i…
Browse files Browse the repository at this point in the history
…nto LexLuthr-feat/SchedRemoveRequest
  • Loading branch information
magik6k committed Aug 5, 2022
2 parents adae785 + ffc10d8 commit 11e4914
Show file tree
Hide file tree
Showing 13 changed files with 201 additions and 4 deletions.
2 changes: 2 additions & 0 deletions api/api_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,8 @@ type StorageMiner interface {
// SealingSchedDiag dumps internal sealing scheduler state
SealingSchedDiag(ctx context.Context, doSched bool) (interface{}, error) //perm:admin
SealingAbort(ctx context.Context, call storiface.CallID) error //perm:admin
//SealingSchedRemove removes a request from sealing pipeline
SealingRemoveRequest(ctx context.Context, schedId uuid.UUID) error //perm:admin

// paths.SectorIndex
StorageAttach(context.Context, storiface.StorageInfo, fsutil.FsStat) error //perm:admin
Expand Down
13 changes: 13 additions & 0 deletions api/proxy_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Binary file modified build/openrpc/full.json.gz
Binary file not shown.
Binary file modified build/openrpc/gateway.json.gz
Binary file not shown.
Binary file modified build/openrpc/miner.json.gz
Binary file not shown.
Binary file modified build/openrpc/worker.json.gz
Binary file not shown.
14 changes: 14 additions & 0 deletions cmd/lotus-miner/sealing.go
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,12 @@ var sealingAbortCmd = &cli.Command{
Name: "abort",
Usage: "Abort a running job",
ArgsUsage: "[callid]",
Flags: []cli.Flag{
&cli.BoolFlag{
Name: "sched",
Usage: "Specifies that the argument is UUID of the request to be removed from scheduler",
},
},
Action: func(cctx *cli.Context) error {
if cctx.Args().Len() != 1 {
return xerrors.Errorf("expected 1 argument")
Expand All @@ -378,6 +384,14 @@ var sealingAbortCmd = &cli.Command{

ctx := lcli.ReqContext(cctx)

if cctx.Bool("sched") {
err = nodeApi.SealingRemoveRequest(ctx, uuid.Must(uuid.Parse(cctx.Args().First())))
if err != nil {
return xerrors.Errorf("Failed to removed the request with UUID %s: %w", cctx.Args().First(), err)
}
return nil
}

jobs, err := nodeApi.WorkerJobs(ctx)
if err != nil {
return xerrors.Errorf("getting worker jobs: %w", err)
Expand Down
16 changes: 16 additions & 0 deletions documentation/en/api-v0-methods-miner.md
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@
* [RuntimeSubsystems](#RuntimeSubsystems)
* [Sealing](#Sealing)
* [SealingAbort](#SealingAbort)
* [SealingRemoveRequest](#SealingRemoveRequest)
* [SealingSchedDiag](#SealingSchedDiag)
* [Sector](#Sector)
* [SectorAbortUpgrade](#SectorAbortUpgrade)
Expand Down Expand Up @@ -2749,6 +2750,21 @@ Inputs:

Response: `{}`

### SealingRemoveRequest
SealingSchedRemove removes a request from sealing pipeline


Perms: admin

Inputs:
```json
[
"07070707-0707-0707-0707-070707070707"
]
```

Response: `{}`

### SealingSchedDiag
SealingSchedDiag dumps internal sealing scheduler state

Expand Down
2 changes: 1 addition & 1 deletion documentation/en/cli-lotus-miner.md
Original file line number Diff line number Diff line change
Expand Up @@ -2342,7 +2342,7 @@ USAGE:
lotus-miner sealing abort [command options] [callid]
OPTIONS:
--help, -h show help (default: false)
--sched Specifies that the argument is UUID of the request to be removed from scheduler (default: false)
```

Expand Down
88 changes: 88 additions & 0 deletions itests/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,25 +2,29 @@ package itests

import (
"context"
"encoding/json"
"strings"
"sync/atomic"
"testing"
"time"

"github.com/google/uuid"
logging "github.com/ipfs/go-log/v2"
"github.com/stretchr/testify/require"
"golang.org/x/xerrors"

"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-state-types/abi"

"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/itests/kit"
"github.com/filecoin-project/lotus/node"
"github.com/filecoin-project/lotus/node/impl"
"github.com/filecoin-project/lotus/node/repo"
"github.com/filecoin-project/lotus/storage/paths"
sealing "github.com/filecoin-project/lotus/storage/pipeline"
"github.com/filecoin-project/lotus/storage/sealer/sealtasks"
"github.com/filecoin-project/lotus/storage/sealer/storiface"
"github.com/filecoin-project/lotus/storage/wdpost"
Expand Down Expand Up @@ -402,6 +406,90 @@ func TestWindowPostWorkerManualPoSt(t *testing.T) {
require.Len(t, lastPending, 0)
}

func TestSchedulerRemoveRequest(t *testing.T) {
ctx := context.Background()
_, miner, worker, ens := kit.EnsembleWorker(t, kit.WithAllSubsystems(), kit.ThroughRPC(), kit.WithNoLocalSealing(true),
kit.WithTaskTypes([]sealtasks.TaskType{sealtasks.TTFetch, sealtasks.TTCommit1, sealtasks.TTFinalize, sealtasks.TTDataCid, sealtasks.TTAddPiece, sealtasks.TTPreCommit1, sealtasks.TTCommit2, sealtasks.TTUnseal})) // no mock proofs

ens.InterconnectAll().BeginMining(50 * time.Millisecond)

e, err := worker.Enabled(ctx)
require.NoError(t, err)
require.True(t, e)

type info struct {
CallToWork struct {
} `json:"CallToWork"`
EarlyRet interface{} `json:"EarlyRet"`
ReturnedWork interface{} `json:"ReturnedWork"`
SchedInfo struct {
OpenWindows []string `json:"OpenWindows"`
Requests []struct {
Priority int `json:"Priority"`
SchedID string `json:"SchedId"`
Sector struct {
Miner int `json:"Miner"`
Number int `json:"Number"`
} `json:"Sector"`
TaskType string `json:"TaskType"`
} `json:"Requests"`
} `json:"SchedInfo"`
Waiting interface{} `json:"Waiting"`
}

tocheck := miner.StartPledge(ctx, 1, 0, nil)
var sn abi.SectorNumber
for n := range tocheck {
sn = n
}
// Keep checking till sector state is PC2, the request should get stuck as worker cannot process PC2
for {
st, err := miner.SectorsStatus(ctx, sn, false)
require.NoError(t, err)
if st.State == api.SectorState(sealing.PreCommit2) {
break
}
time.Sleep(time.Second)
}

// Dump current scheduler info
schedb, err := miner.SealingSchedDiag(ctx, false)
require.NoError(t, err)

j, err := json.MarshalIndent(&schedb, "", " ")
require.NoError(t, err)

var b info
err = json.Unmarshal(j, &b)
require.NoError(t, err)

var schedidb uuid.UUID

// cast scheduler info and get the request UUID. Call the SealingRemoveRequest()
require.Len(t, b.SchedInfo.Requests, 1)
require.Equal(t, "seal/v0/precommit/2", b.SchedInfo.Requests[0].TaskType)

schedidb, err = uuid.Parse(b.SchedInfo.Requests[0].SchedID)
require.NoError(t, err)

err = miner.SealingRemoveRequest(ctx, schedidb)
require.NoError(t, err)

// Dump the schduler again and compare the UUID if a request is present
// If no request present then pass the test
scheda, err := miner.SealingSchedDiag(ctx, false)
require.NoError(t, err)

k, err := json.MarshalIndent(&scheda, "", " ")
require.NoError(t, err)

var a info
err = json.Unmarshal(k, &a)
require.NoError(t, err)

require.Len(t, a.SchedInfo.Requests, 0)
}

func TestWorkerName(t *testing.T) {
name := "thisstringisprobablynotahostnameihope"

Expand Down
4 changes: 4 additions & 0 deletions node/impl/storminer.go
Original file line number Diff line number Diff line change
Expand Up @@ -462,6 +462,10 @@ func (sm *StorageMinerAPI) SealingAbort(ctx context.Context, call storiface.Call
return sm.StorageMgr.Abort(ctx, call)
}

func (sm *StorageMinerAPI) SealingRemoveRequest(ctx context.Context, schedId uuid.UUID) error {
return sm.StorageMgr.RemoveSchedRequest(ctx, schedId)
}

func (sm *StorageMinerAPI) MarketImportDealData(ctx context.Context, propCid cid.Cid, path string) error {
fi, err := os.Open(path)
if err != nil {
Expand Down
4 changes: 4 additions & 0 deletions storage/sealer/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -1171,6 +1171,10 @@ func (m *Manager) SchedDiag(ctx context.Context, doSched bool) (interface{}, err
return i, nil
}

func (m *Manager) RemoveSchedRequest(ctx context.Context, schedId uuid.UUID) error {
return m.sched.RemoveRequest(ctx, schedId)
}

func (m *Manager) Close(ctx context.Context) error {
m.windowPoStSched.schedClose()
m.winningPoStSched.schedClose()
Expand Down
62 changes: 59 additions & 3 deletions storage/sealer/sched.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,8 @@ type Scheduler struct {

workTracker *workTracker

info chan func(interface{})
info chan func(interface{})
rmRequest chan *rmRequest

closing chan struct{}
closed chan struct{}
Expand Down Expand Up @@ -122,6 +123,7 @@ type WorkerRequest struct {
TaskType sealtasks.TaskType
Priority int // larger values more important
Sel WorkerSelector
SchedId uuid.UUID

prepare WorkerAction
work WorkerAction
Expand All @@ -139,6 +141,11 @@ type workerResponse struct {
err error
}

type rmRequest struct {
id uuid.UUID
res chan error
}

func newScheduler(assigner string) (*Scheduler, error) {
var a Assigner
switch assigner {
Expand Down Expand Up @@ -168,7 +175,8 @@ func newScheduler(assigner string) (*Scheduler, error) {
prepared: map[uuid.UUID]trackedWork{},
},

info: make(chan func(interface{})),
info: make(chan func(interface{})),
rmRequest: make(chan *rmRequest),

closing: make(chan struct{}),
closed: make(chan struct{}),
Expand All @@ -184,6 +192,7 @@ func (sh *Scheduler) Schedule(ctx context.Context, sector storiface.SectorRef, t
TaskType: taskType,
Priority: getPriority(ctx),
Sel: sel,
SchedId: uuid.New(),

prepare: prepare,
work: work,
Expand Down Expand Up @@ -228,6 +237,7 @@ type SchedDiagRequestInfo struct {
Sector abi.SectorID
TaskType sealtasks.TaskType
Priority int
SchedId uuid.UUID
}

type SchedDiagInfo struct {
Expand All @@ -246,6 +256,9 @@ func (sh *Scheduler) runSched() {
var toDisable []workerDisableReq

select {
case rmreq := <-sh.rmRequest:
sh.removeRequest(rmreq)
doSched = true
case <-sh.workerChange:
doSched = true
case dreq := <-sh.workerDisable:
Expand All @@ -263,7 +276,6 @@ func (sh *Scheduler) runSched() {
doSched = true
case ireq := <-sh.info:
ireq(sh.diag())

case <-iw:
initialised = true
iw = nil
Expand Down Expand Up @@ -332,6 +344,7 @@ func (sh *Scheduler) diag() SchedDiagInfo {
Sector: task.Sector.ID,
TaskType: task.TaskType,
Priority: task.Priority,
SchedId: task.SchedId,
})
}

Expand Down Expand Up @@ -381,6 +394,49 @@ func (sh *Scheduler) Info(ctx context.Context) (interface{}, error) {
}
}

func (sh *Scheduler) removeRequest(rmrequest *rmRequest) {

if sh.SchedQueue.Len() < 0 {
rmrequest.res <- xerrors.New("No requests in the scheduler")
return
}

queue := sh.SchedQueue
for i, r := range *queue {
if r.SchedId == rmrequest.id {
queue.Remove(i)
rmrequest.res <- nil
go r.respond(xerrors.Errorf("scheduling request removed"))
return
}
}
rmrequest.res <- xerrors.New("No request with provided details found")
}

func (sh *Scheduler) RemoveRequest(ctx context.Context, schedId uuid.UUID) error {
ret := make(chan error, 1)

select {
case sh.rmRequest <- &rmRequest{
id: schedId,
res: ret,
}:
case <-sh.closing:
return xerrors.New("closing")
case <-ctx.Done():
return ctx.Err()
}

select {
case resp := <-ret:
return resp
case <-sh.closing:
return xerrors.New("closing")
case <-ctx.Done():
return ctx.Err()
}
}

func (sh *Scheduler) Close(ctx context.Context) error {
close(sh.closing)
select {
Expand Down

0 comments on commit 11e4914

Please sign in to comment.