Skip to content

Commit

Permalink
sched: Strong preferrence in WorkerSelector
Browse files Browse the repository at this point in the history
  • Loading branch information
magik6k committed May 23, 2022
1 parent 3de34ea commit b576008
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 19 deletions.
4 changes: 3 additions & 1 deletion extern/sector-storage/sched.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,9 @@ const mib = 1 << 20
type WorkerAction func(ctx context.Context, w Worker) error

type WorkerSelector interface {
Ok(ctx context.Context, task sealtasks.TaskType, spt abi.RegisteredSealProof, a *WorkerHandle) (bool, error) // true if worker is acceptable for performing a task
// Ok is true if worker is acceptable for performing a task.
// If any worker is preferred for a task, other workers won't be considered for that task.
Ok(ctx context.Context, task sealtasks.TaskType, spt abi.RegisteredSealProof, a *WorkerHandle) (ok, preferred bool, err error)

Cmp(ctx context.Context, task sealtasks.TaskType, a, b *WorkerHandle) (bool, error) // true if a is preferred over b
}
Expand Down
17 changes: 15 additions & 2 deletions extern/sector-storage/sched_assigner_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,10 @@ func (a *AssignerCommon) TrySched(sh *Scheduler) {
}()

task := (*sh.SchedQueue)[sqi]

task.IndexHeap = sqi

var havePreferred bool

for wnd, windowRequest := range sh.OpenWindows {
worker, ok := sh.Workers[windowRequest.Worker]
if !ok {
Expand All @@ -84,7 +86,7 @@ func (a *AssignerCommon) TrySched(sh *Scheduler) {
}

rpcCtx, cancel := context.WithTimeout(task.Ctx, SelectorTimeout)
ok, err := task.Sel.Ok(rpcCtx, task.TaskType, task.Sector.ProofType, worker)
ok, preferred, err := task.Sel.Ok(rpcCtx, task.TaskType, task.Sector.ProofType, worker)
cancel()
if err != nil {
log.Errorf("trySched(1) req.Sel.Ok error: %+v", err)
Expand All @@ -95,6 +97,17 @@ func (a *AssignerCommon) TrySched(sh *Scheduler) {
continue
}

if havePreferred && !preferred {
// we have a way better worker for this task
continue
}

if preferred && !havePreferred {
// all workers we considered previously are much worse choice
acceptableWindows[sqi] = acceptableWindows[sqi][:0]
havePreferred = true
}

acceptableWindows[sqi] = append(acceptableWindows[sqi], wnd)
}

Expand Down
16 changes: 8 additions & 8 deletions extern/sector-storage/selector_alloc.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,18 +26,18 @@ func newAllocSelector(index stores.SectorIndex, alloc storiface.SectorFileType,
}
}

func (s *allocSelector) Ok(ctx context.Context, task sealtasks.TaskType, spt abi.RegisteredSealProof, whnd *WorkerHandle) (bool, error) {
func (s *allocSelector) Ok(ctx context.Context, task sealtasks.TaskType, spt abi.RegisteredSealProof, whnd *WorkerHandle) (bool, bool, error) {
tasks, err := whnd.TaskTypes(ctx)
if err != nil {
return false, xerrors.Errorf("getting supported worker task types: %w", err)
return false, false, xerrors.Errorf("getting supported worker task types: %w", err)
}
if _, supported := tasks[task]; !supported {
return false, nil
return false, false, nil
}

paths, err := whnd.workerRpc.Paths(ctx)
if err != nil {
return false, xerrors.Errorf("getting worker paths: %w", err)
return false, false, xerrors.Errorf("getting worker paths: %w", err)
}

have := map[storiface.ID]struct{}{}
Expand All @@ -47,21 +47,21 @@ func (s *allocSelector) Ok(ctx context.Context, task sealtasks.TaskType, spt abi

ssize, err := spt.SectorSize()
if err != nil {
return false, xerrors.Errorf("getting sector size: %w", err)
return false, false, xerrors.Errorf("getting sector size: %w", err)
}

best, err := s.index.StorageBestAlloc(ctx, s.alloc, ssize, s.ptype)
if err != nil {
return false, xerrors.Errorf("finding best alloc storage: %w", err)
return false, false, xerrors.Errorf("finding best alloc storage: %w", err)
}

for _, info := range best {
if _, ok := have[info.ID]; ok {
return true, nil
return true, false, nil
}
}

return false, nil
return false, false, nil
}

func (s *allocSelector) Cmp(ctx context.Context, task sealtasks.TaskType, a, b *WorkerHandle) (bool, error) {
Expand Down
16 changes: 8 additions & 8 deletions extern/sector-storage/selector_existing.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,18 +28,18 @@ func newExistingSelector(index stores.SectorIndex, sector abi.SectorID, alloc st
}
}

func (s *existingSelector) Ok(ctx context.Context, task sealtasks.TaskType, spt abi.RegisteredSealProof, whnd *WorkerHandle) (bool, error) {
func (s *existingSelector) Ok(ctx context.Context, task sealtasks.TaskType, spt abi.RegisteredSealProof, whnd *WorkerHandle) (bool, bool, error) {
tasks, err := whnd.TaskTypes(ctx)
if err != nil {
return false, xerrors.Errorf("getting supported worker task types: %w", err)
return false, false, xerrors.Errorf("getting supported worker task types: %w", err)
}
if _, supported := tasks[task]; !supported {
return false, nil
return false, false, nil
}

paths, err := whnd.workerRpc.Paths(ctx)
if err != nil {
return false, xerrors.Errorf("getting worker paths: %w", err)
return false, false, xerrors.Errorf("getting worker paths: %w", err)
}

have := map[storiface.ID]struct{}{}
Expand All @@ -49,21 +49,21 @@ func (s *existingSelector) Ok(ctx context.Context, task sealtasks.TaskType, spt

ssize, err := spt.SectorSize()
if err != nil {
return false, xerrors.Errorf("getting sector size: %w", err)
return false, false, xerrors.Errorf("getting sector size: %w", err)
}

best, err := s.index.StorageFindSector(ctx, s.sector, s.alloc, ssize, s.allowFetch)
if err != nil {
return false, xerrors.Errorf("finding best storage: %w", err)
return false, false, xerrors.Errorf("finding best storage: %w", err)
}

for _, info := range best {
if _, ok := have[info.ID]; ok {
return true, nil
return true, false, nil
}
}

return false, nil
return false, false, nil
}

func (s *existingSelector) Cmp(ctx context.Context, task sealtasks.TaskType, a, b *WorkerHandle) (bool, error) {
Expand Down

0 comments on commit b576008

Please sign in to comment.