diff --git a/cmd/lotus-miner/run.go b/cmd/lotus-miner/run.go index f276f319c9b..03ebf9e1c25 100644 --- a/cmd/lotus-miner/run.go +++ b/cmd/lotus-miner/run.go @@ -4,6 +4,7 @@ import ( "fmt" _ "net/http/pprof" "os" + "strconv" "github.com/filecoin-project/lotus/api/v1api" @@ -49,6 +50,11 @@ var runCmd = &cli.Command{ Usage: "manage open file limit", Value: true, }, + &cli.IntFlag{ + Name: "parallel-p1-limit", + Usage: "maximum pre commit1 operations to run in parallel", + Value: -1, + }, }, Action: func(cctx *cli.Context) error { if !cctx.Bool("enable-gpu-proving") { @@ -58,6 +64,8 @@ var runCmd = &cli.Command{ } } + os.Setenv("PARALLEL_P1_LIMIT", strconv.Itoa(cctx.Int("parallel-p1-limit"))) + ctx, _ := tag.New(lcli.DaemonContext(cctx), tag.Insert(metrics.Version, build.BuildVersion), tag.Insert(metrics.Commit, build.CurrentCommit), diff --git a/cmd/lotus-miner/sealing.go b/cmd/lotus-miner/sealing.go index 4f048ad1750..fab7fb8808a 100644 --- a/cmd/lotus-miner/sealing.go +++ b/cmd/lotus-miner/sealing.go @@ -152,6 +152,12 @@ func workersCmd(sealing bool) *cli.Command { for _, gpu := range stat.Info.Resources.GPUs { fmt.Printf("\tGPU: %s\n", color.New(gpuCol).Sprintf("%s, %sused", gpu, gpuUse)) } + + plConfig, ok := stat.Info.TaskLimits[sealtasks.TTPreCommit1] + if ok && plConfig.LimitCount > 0 { + fmt.Printf("\tP1LIMIT: [%s] %d/%d tasks are running\n", + barString(float64(plConfig.LimitCount), 0, float64(plConfig.RunCount)), plConfig.RunCount, plConfig.LimitCount) + } } return nil diff --git a/cmd/lotus-worker/main.go b/cmd/lotus-worker/main.go index 83c821105a5..88d31282acb 100644 --- a/cmd/lotus-worker/main.go +++ b/cmd/lotus-worker/main.go @@ -9,6 +9,7 @@ import ( "net/http" "os" "path/filepath" + "strconv" "strings" "time" @@ -208,6 +209,11 @@ var runCmd = &cli.Command{ Usage: "used when 'listen' is unspecified. must be a valid duration recognized by golang's time.ParseDuration function", Value: "30m", }, + &cli.IntFlag{ + Name: "parallel-p1-limit", + Usage: "maximum precommit1 operations to run in parallel", + Value: -1, + }, }, Before: func(cctx *cli.Context) error { if cctx.IsSet("address") { @@ -228,6 +234,8 @@ var runCmd = &cli.Command{ } } + os.Setenv("PARALLEL_P1_LIMIT", strconv.Itoa(cctx.Int("parallel-p1-limit"))) + limit, _, err := ulimit.GetLimit() switch { case err == ulimit.ErrUnsupported: diff --git a/extern/sector-storage/sched.go b/extern/sector-storage/sched.go index eb2d17c5926..cbe9487978d 100644 --- a/extern/sector-storage/sched.go +++ b/extern/sector-storage/sched.go @@ -484,6 +484,10 @@ func (sh *scheduler) trySched() { continue } + if !sh.CanHandleTask(task.taskType, wid) { + continue + } + wu, found := workerUtil[wid] if !found { wu = w.utilization() @@ -503,6 +507,7 @@ func (sh *scheduler) trySched() { // #--------> acceptableWindow index // // * -> we're here + sh.TaskAdd(task.taskType, bestWid) break } @@ -610,3 +615,55 @@ func (sh *scheduler) Close(ctx context.Context) error { } return nil } + +func (sh *scheduler) CanHandleTask(taskType sealtasks.TaskType, wid storiface.WorkerID) (flag bool) { + if wh, ok := sh.workers[wid]; ok { + wh.info.TaskLimitLk.Lock() + defer wh.info.TaskLimitLk.Unlock() + taskLimit, ok := wh.info.TaskLimits[taskType] + if !ok { + flag = true + return + } + log.Debugf("CanHandleTask: %v:%v", taskLimit.LimitCount, taskLimit.RunCount) + if taskLimit.LimitCount > 0 { + freeCount := taskLimit.LimitCount - taskLimit.RunCount + if freeCount > 0 { + flag = true + } + } else { + flag = true + } + } else { + flag = true + } + return +} + +func (sh *scheduler) TaskAdd(taskType sealtasks.TaskType, wid storiface.WorkerID) { + log.Debugf("begin task add:%v-%v", wid, taskType) + if wh, ok := sh.workers[wid]; ok { + wh.info.TaskLimitLk.Lock() + defer wh.info.TaskLimitLk.Unlock() + taskLimit, ok := wh.info.TaskLimits[taskType] + if ok { + log.Debugf("task limit:%v-%v", taskLimit.LimitCount, taskLimit.RunCount) + taskLimit.RunCount++ + } + } + +} + +func (sh *scheduler) TaskReduce(taskType sealtasks.TaskType, wid storiface.WorkerID) { + log.Debugf("begin task reduce:%v-%v", wid, taskType) + if wh, ok := sh.workers[wid]; ok { + wh.info.TaskLimitLk.Lock() + defer wh.info.TaskLimitLk.Unlock() + taskLimit, ok := wh.info.TaskLimits[taskType] + if ok { + log.Debugf("task limit:%v-%v", taskLimit.LimitCount, taskLimit.RunCount) + taskLimit.RunCount-- + } + } + +} diff --git a/extern/sector-storage/sched_test.go b/extern/sector-storage/sched_test.go index 77e67479345..5b01e25ad42 100644 --- a/extern/sector-storage/sched_test.go +++ b/extern/sector-storage/sched_test.go @@ -161,9 +161,15 @@ func (s *schedTestWorker) Paths(ctx context.Context) ([]storiface.StoragePath, e } func (s *schedTestWorker) Info(ctx context.Context) (storiface.WorkerInfo, error) { + taskLimits := make(map[sealtasks.TaskType]*storiface.LimitConfig) + taskLimits[sealtasks.TTPreCommit1] = &storiface.LimitConfig{ + LimitCount: 6, + RunCount: 0, + } return storiface.WorkerInfo{ Hostname: s.name, IgnoreResources: s.ignoreResources, + TaskLimits: taskLimits, Resources: s.resources, }, nil } diff --git a/extern/sector-storage/sched_worker.go b/extern/sector-storage/sched_worker.go index f0a85ea3fdb..bebd3a75afd 100644 --- a/extern/sector-storage/sched_worker.go +++ b/extern/sector-storage/sched_worker.go @@ -526,6 +526,7 @@ func (sw *schedWorker) startProcessingTask(req *workerRequest) error { if err != nil { log.Errorf("error executing worker (withResources): %+v", err) } + sh.TaskReduce(req.taskType, sw.wid) }() return nil @@ -555,6 +556,7 @@ func (sw *schedWorker) startProcessingReadyTask(req *workerRequest) error { w.lk.Lock() w.active.free(w.info.Resources, needRes) + sh.TaskReduce(req.taskType, sw.wid) select { case sw.taskDone <- struct{}{}: diff --git a/extern/sector-storage/storiface/worker.go b/extern/sector-storage/storiface/worker.go index 5b4fabf0248..b1b89443d7d 100644 --- a/extern/sector-storage/storiface/worker.go +++ b/extern/sector-storage/storiface/worker.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "sync" "time" "github.com/google/uuid" @@ -31,6 +32,14 @@ type WorkerInfo struct { // Default should be false (zero value, i.e. resources taken into account). IgnoreResources bool Resources WorkerResources + + TaskLimits map[sealtasks.TaskType]*LimitConfig + TaskLimitLk sync.Mutex +} + +type LimitConfig struct { + LimitCount int + RunCount int } type WorkerResources struct { diff --git a/extern/sector-storage/worker_local.go b/extern/sector-storage/worker_local.go index 9a14e42b5f1..982dc5140f5 100644 --- a/extern/sector-storage/worker_local.go +++ b/extern/sector-storage/worker_local.go @@ -7,6 +7,7 @@ import ( "os" "reflect" "runtime" + "strconv" "sync" "sync/atomic" "time" @@ -797,9 +798,26 @@ func (l *LocalWorker) Info(context.Context) (storiface.WorkerInfo, error) { return storiface.WorkerInfo{}, xerrors.Errorf("interpreting resource env vars: %w", err) } + // parallel-p1-limit + p1Limit := -1 + if limit, ok := os.LookupEnv("PARALLEL_P1_LIMIT"); ok { + li, err := strconv.Atoi(limit) + if err != nil { + log.Errorf("failed to parse PARALLEL_P1_LIMIT env var, default=-1") + } else { + p1Limit = li + } + } + taskLimits := make(map[sealtasks.TaskType]*storiface.LimitConfig) + taskLimits[sealtasks.TTPreCommit1] = &storiface.LimitConfig{ + LimitCount: p1Limit, + RunCount: 0, + } + return storiface.WorkerInfo{ Hostname: hostname, IgnoreResources: l.ignoreResources, + TaskLimits: taskLimits, Resources: storiface.WorkerResources{ MemPhysical: memPhysical, MemUsed: memUsed,