Skip to content

Commit

Permalink
Merge pull request #8700 from filecoin-project/feat/multi-sched
Browse files Browse the repository at this point in the history
feat: sched: Add scheduler interfaces, configurable assigner
  • Loading branch information
magik6k authored May 26, 2022
2 parents 6f2c8d6 + 3de34ea commit cfff877
Show file tree
Hide file tree
Showing 24 changed files with 686 additions and 485 deletions.
8 changes: 8 additions & 0 deletions documentation/en/default-lotus-miner-config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -534,6 +534,14 @@
# env var: LOTUS_STORAGE_ALLOWREGENSECTORKEY
#AllowRegenSectorKey = true

# Assigner specifies the worker assigner to use when scheduling tasks.
# "utilization" (default) - assign tasks to workers with lowest utilization.
# "spread" - assign tasks to as many distinct workers as possible.
#
# type: string
# env var: LOTUS_STORAGE_ASSIGNER
#Assigner = "utilization"

# ResourceFiltering instructs the system which resource filtering strategy
# to use when evaluating tasks against this worker. An empty value defaults
# to "hardware".
Expand Down
11 changes: 9 additions & 2 deletions extern/sector-storage/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ type Manager struct {
remoteHnd *stores.FetchHandler
index stores.SectorIndex

sched *scheduler
sched *Scheduler
windowPoStSched *poStScheduler
winningPoStSched *poStScheduler

Expand Down Expand Up @@ -122,6 +122,8 @@ type Config struct {

// PoSt config
ParallelCheckLimit int

Assigner string
}

type StorageAuth http.Header
Expand All @@ -135,14 +137,19 @@ func New(ctx context.Context, lstor *stores.Local, stor stores.Store, ls stores.
return nil, xerrors.Errorf("creating prover instance: %w", err)
}

sh, err := newScheduler(sc.Assigner)
if err != nil {
return nil, err
}

m := &Manager{
ls: ls,
storage: stor,
localStore: lstor,
remoteHnd: &stores.FetchHandler{Local: lstor, PfHandler: &stores.DefaultPartialFileHandler{}},
index: si,

sched: newScheduler(),
sched: sh,
windowPoStSched: newPoStScheduler(sealtasks.TTGenerateWindowPoSt),
winningPoStSched: newPoStScheduler(sealtasks.TTGenerateWinningPoSt),

Expand Down
5 changes: 4 additions & 1 deletion extern/sector-storage/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,14 +109,17 @@ func newTestMgr(ctx context.Context, t *testing.T, ds datastore.Datastore) (*Man

stor := stores.NewRemote(lstor, si, nil, 6000, &stores.DefaultPartialFileHandler{})

sh, err := newScheduler("")
require.NoError(t, err)

m := &Manager{
ls: st,
storage: stor,
localStore: lstor,
remoteHnd: &stores.FetchHandler{Local: lstor},
index: si,

sched: newScheduler(),
sched: sh,
windowPoStSched: newPoStScheduler(sealtasks.TTGenerateWindowPoSt),
winningPoStSched: newPoStScheduler(sealtasks.TTGenerateWinningPoSt),

Expand Down
24 changes: 12 additions & 12 deletions extern/sector-storage/request_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,42 +2,42 @@ package sectorstorage

import "sort"

type requestQueue []*workerRequest
type RequestQueue []*WorkerRequest

func (q requestQueue) Len() int { return len(q) }
func (q RequestQueue) Len() int { return len(q) }

func (q requestQueue) Less(i, j int) bool {
oneMuchLess, muchLess := q[i].taskType.MuchLess(q[j].taskType)
func (q RequestQueue) Less(i, j int) bool {
oneMuchLess, muchLess := q[i].TaskType.MuchLess(q[j].TaskType)
if oneMuchLess {
return muchLess
}

if q[i].priority != q[j].priority {
return q[i].priority > q[j].priority
if q[i].Priority != q[j].Priority {
return q[i].Priority > q[j].Priority
}

if q[i].taskType != q[j].taskType {
return q[i].taskType.Less(q[j].taskType)
if q[i].TaskType != q[j].TaskType {
return q[i].TaskType.Less(q[j].TaskType)
}

return q[i].sector.ID.Number < q[j].sector.ID.Number // optimize minerActor.NewSectors bitfield
return q[i].Sector.ID.Number < q[j].Sector.ID.Number // optimize minerActor.NewSectors bitfield
}

func (q requestQueue) Swap(i, j int) {
func (q RequestQueue) Swap(i, j int) {
q[i], q[j] = q[j], q[i]
q[i].index = i
q[j].index = j
}

func (q *requestQueue) Push(x *workerRequest) {
func (q *RequestQueue) Push(x *WorkerRequest) {
n := len(*q)
item := x
item.index = n
*q = append(*q, item)
sort.Sort(q)
}

func (q *requestQueue) Remove(i int) *workerRequest {
func (q *RequestQueue) Remove(i int) *WorkerRequest {
old := *q
n := len(old)
item := old[i]
Expand Down
30 changes: 15 additions & 15 deletions extern/sector-storage/request_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,21 +8,21 @@ import (
)

func TestRequestQueue(t *testing.T) {
rq := &requestQueue{}
rq := &RequestQueue{}

rq.Push(&workerRequest{taskType: sealtasks.TTAddPiece})
rq.Push(&workerRequest{taskType: sealtasks.TTPreCommit1})
rq.Push(&workerRequest{taskType: sealtasks.TTPreCommit2})
rq.Push(&workerRequest{taskType: sealtasks.TTPreCommit1})
rq.Push(&workerRequest{taskType: sealtasks.TTAddPiece})
rq.Push(&WorkerRequest{TaskType: sealtasks.TTAddPiece})
rq.Push(&WorkerRequest{TaskType: sealtasks.TTPreCommit1})
rq.Push(&WorkerRequest{TaskType: sealtasks.TTPreCommit2})
rq.Push(&WorkerRequest{TaskType: sealtasks.TTPreCommit1})
rq.Push(&WorkerRequest{TaskType: sealtasks.TTAddPiece})

dump := func(s string) {
fmt.Println("---")
fmt.Println(s)

for sqi := 0; sqi < rq.Len(); sqi++ {
task := (*rq)[sqi]
fmt.Println(sqi, task.taskType)
fmt.Println(sqi, task.TaskType)
}
}

Expand All @@ -32,31 +32,31 @@ func TestRequestQueue(t *testing.T) {

dump("pop 1")

if pt.taskType != sealtasks.TTPreCommit2 {
t.Error("expected precommit2, got", pt.taskType)
if pt.TaskType != sealtasks.TTPreCommit2 {
t.Error("expected precommit2, got", pt.TaskType)
}

pt = rq.Remove(0)

dump("pop 2")

if pt.taskType != sealtasks.TTPreCommit1 {
t.Error("expected precommit1, got", pt.taskType)
if pt.TaskType != sealtasks.TTPreCommit1 {
t.Error("expected precommit1, got", pt.TaskType)
}

pt = rq.Remove(1)

dump("pop 3")

if pt.taskType != sealtasks.TTAddPiece {
t.Error("expected addpiece, got", pt.taskType)
if pt.TaskType != sealtasks.TTAddPiece {
t.Error("expected addpiece, got", pt.TaskType)
}

pt = rq.Remove(0)

dump("pop 4")

if pt.taskType != sealtasks.TTPreCommit1 {
t.Error("expected precommit1, got", pt.taskType)
if pt.TaskType != sealtasks.TTPreCommit1 {
t.Error("expected precommit1, got", pt.TaskType)
}
}
Loading

0 comments on commit cfff877

Please sign in to comment.