Skip to content

Commit

Permalink
scheduling optimization
Browse files Browse the repository at this point in the history
  • Loading branch information
peter198 committed May 20, 2022
1 parent effee8c commit c4cfb7a
Show file tree
Hide file tree
Showing 8 changed files with 114 additions and 0 deletions.
8 changes: 8 additions & 0 deletions cmd/lotus-miner/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"
_ "net/http/pprof"
"os"
"strconv"

"github.com/filecoin-project/lotus/api/v1api"

Expand Down Expand Up @@ -49,6 +50,11 @@ var runCmd = &cli.Command{
Usage: "manage open file limit",
Value: true,
},
&cli.IntFlag{
Name: "parallel-p1-limit",
Usage: "maximum pre commit1 operations to run in parallel",
Value: -1,
},
},
Action: func(cctx *cli.Context) error {
if !cctx.Bool("enable-gpu-proving") {
Expand All @@ -58,6 +64,8 @@ var runCmd = &cli.Command{
}
}

os.Setenv("PARALLEL_P1_LIMIT", strconv.Itoa(cctx.Int("parallel-p1-limit")))

ctx, _ := tag.New(lcli.DaemonContext(cctx),
tag.Insert(metrics.Version, build.BuildVersion),
tag.Insert(metrics.Commit, build.CurrentCommit),
Expand Down
6 changes: 6 additions & 0 deletions cmd/lotus-miner/sealing.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,12 @@ func workersCmd(sealing bool) *cli.Command {
for _, gpu := range stat.Info.Resources.GPUs {
fmt.Printf("\tGPU: %s\n", color.New(gpuCol).Sprintf("%s, %sused", gpu, gpuUse))
}

plConfig, ok := stat.Info.TaskLimits[sealtasks.TTPreCommit1]
if ok && plConfig.LimitCount > 0 {
fmt.Printf("\tP1LIMIT: [%s] %d/%d tasks are running\n",
barString(float64(plConfig.LimitCount), 0, float64(plConfig.RunCount)), plConfig.RunCount, plConfig.LimitCount)
}
}

return nil
Expand Down
8 changes: 8 additions & 0 deletions cmd/lotus-worker/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"net/http"
"os"
"path/filepath"
"strconv"
"strings"
"time"

Expand Down Expand Up @@ -208,6 +209,11 @@ var runCmd = &cli.Command{
Usage: "used when 'listen' is unspecified. must be a valid duration recognized by golang's time.ParseDuration function",
Value: "30m",
},
&cli.IntFlag{
Name: "parallel-p1-limit",
Usage: "maximum precommit1 operations to run in parallel",
Value: -1,
},
},
Before: func(cctx *cli.Context) error {
if cctx.IsSet("address") {
Expand All @@ -228,6 +234,8 @@ var runCmd = &cli.Command{
}
}

os.Setenv("PARALLEL_P1_LIMIT", strconv.Itoa(cctx.Int("parallel-p1-limit")))

limit, _, err := ulimit.GetLimit()
switch {
case err == ulimit.ErrUnsupported:
Expand Down
57 changes: 57 additions & 0 deletions extern/sector-storage/sched.go
Original file line number Diff line number Diff line change
Expand Up @@ -484,6 +484,10 @@ func (sh *scheduler) trySched() {
continue
}

if !sh.CanHandleTask(task.taskType, wid) {
continue
}

wu, found := workerUtil[wid]
if !found {
wu = w.utilization()
Expand All @@ -503,6 +507,7 @@ func (sh *scheduler) trySched() {
// #--------> acceptableWindow index
//
// * -> we're here
sh.TaskAdd(task.taskType, bestWid)
break
}

Expand Down Expand Up @@ -610,3 +615,55 @@ func (sh *scheduler) Close(ctx context.Context) error {
}
return nil
}

func (sh *scheduler) CanHandleTask(taskType sealtasks.TaskType, wid storiface.WorkerID) (flag bool) {
if wh, ok := sh.workers[wid]; ok {
wh.info.TaskLimitLk.Lock()
defer wh.info.TaskLimitLk.Unlock()
taskLimit, ok := wh.info.TaskLimits[taskType]
if !ok {
flag = true
return
}
log.Debugf("CanHandleTask: %v:%v", taskLimit.LimitCount, taskLimit.RunCount)
if taskLimit.LimitCount > 0 {
freeCount := taskLimit.LimitCount - taskLimit.RunCount
if freeCount > 0 {
flag = true
}
} else {
flag = true
}
} else {
flag = true
}
return
}

func (sh *scheduler) TaskAdd(taskType sealtasks.TaskType, wid storiface.WorkerID) {
log.Debugf("begin task add:%v-%v", wid, taskType)
if wh, ok := sh.workers[wid]; ok {
wh.info.TaskLimitLk.Lock()
defer wh.info.TaskLimitLk.Unlock()
taskLimit, ok := wh.info.TaskLimits[taskType]
if ok {
log.Debugf("task limit:%v-%v", taskLimit.LimitCount, taskLimit.RunCount)
taskLimit.RunCount++
}
}

}

func (sh *scheduler) TaskReduce(taskType sealtasks.TaskType, wid storiface.WorkerID) {
log.Debugf("begin task reduce:%v-%v", wid, taskType)
if wh, ok := sh.workers[wid]; ok {
wh.info.TaskLimitLk.Lock()
defer wh.info.TaskLimitLk.Unlock()
taskLimit, ok := wh.info.TaskLimits[taskType]
if ok {
log.Debugf("task limit:%v-%v", taskLimit.LimitCount, taskLimit.RunCount)
taskLimit.RunCount--
}
}

}
6 changes: 6 additions & 0 deletions extern/sector-storage/sched_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,9 +161,15 @@ func (s *schedTestWorker) Paths(ctx context.Context) ([]storiface.StoragePath, e
}

func (s *schedTestWorker) Info(ctx context.Context) (storiface.WorkerInfo, error) {
taskLimits := make(map[sealtasks.TaskType]*storiface.LimitConfig)
taskLimits[sealtasks.TTPreCommit1] = &storiface.LimitConfig{
LimitCount: 6,
RunCount: 0,
}
return storiface.WorkerInfo{
Hostname: s.name,
IgnoreResources: s.ignoreResources,
TaskLimits: taskLimits,
Resources: s.resources,
}, nil
}
Expand Down
2 changes: 2 additions & 0 deletions extern/sector-storage/sched_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -526,6 +526,7 @@ func (sw *schedWorker) startProcessingTask(req *workerRequest) error {
if err != nil {
log.Errorf("error executing worker (withResources): %+v", err)
}
sh.TaskReduce(req.taskType, sw.wid)
}()

return nil
Expand Down Expand Up @@ -555,6 +556,7 @@ func (sw *schedWorker) startProcessingReadyTask(req *workerRequest) error {
w.lk.Lock()

w.active.free(w.info.Resources, needRes)
sh.TaskReduce(req.taskType, sw.wid)

select {
case sw.taskDone <- struct{}{}:
Expand Down
9 changes: 9 additions & 0 deletions extern/sector-storage/storiface/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"sync"
"time"

"github.com/google/uuid"
Expand Down Expand Up @@ -31,6 +32,14 @@ type WorkerInfo struct {
// Default should be false (zero value, i.e. resources taken into account).
IgnoreResources bool
Resources WorkerResources

TaskLimits map[sealtasks.TaskType]*LimitConfig
TaskLimitLk sync.Mutex
}

type LimitConfig struct {
LimitCount int
RunCount int
}

type WorkerResources struct {
Expand Down
18 changes: 18 additions & 0 deletions extern/sector-storage/worker_local.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"os"
"reflect"
"runtime"
"strconv"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -797,9 +798,26 @@ func (l *LocalWorker) Info(context.Context) (storiface.WorkerInfo, error) {
return storiface.WorkerInfo{}, xerrors.Errorf("interpreting resource env vars: %w", err)
}

// parallel-p1-limit
p1Limit := -1
if limit, ok := os.LookupEnv("PARALLEL_P1_LIMIT"); ok {
li, err := strconv.Atoi(limit)
if err != nil {
log.Errorf("failed to parse PARALLEL_P1_LIMIT env var, default=-1")
} else {
p1Limit = li
}
}
taskLimits := make(map[sealtasks.TaskType]*storiface.LimitConfig)
taskLimits[sealtasks.TTPreCommit1] = &storiface.LimitConfig{
LimitCount: p1Limit,
RunCount: 0,
}

return storiface.WorkerInfo{
Hostname: hostname,
IgnoreResources: l.ignoreResources,
TaskLimits: taskLimits,
Resources: storiface.WorkerResources{
MemPhysical: memPhysical,
MemUsed: memUsed,
Expand Down

0 comments on commit c4cfb7a

Please sign in to comment.