Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: sched: Finalize* move selectors #8710

Merged
merged 5 commits into from
May 26, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions documentation/en/default-lotus-miner-config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -542,6 +542,22 @@
# env var: LOTUS_STORAGE_ASSIGNER
#Assigner = "utilization"

# DisallowRemoteFinalize when set to true will force all Finalize tasks to
# run on workers with local access to both long-term storage and the sealing
# path containing the sector.
# --
# WARNING: Only set this if all workers have access to long-term storage
# paths. If this flag is enabled, and there are workers without long-term
# storage access, sectors will not be moved from them, and Finalize tasks
# will appear to be stuck.
# --
# If you see stuck Finalize tasks after enabling this setting, check
# 'lotus-miner sealing sched-diag' and 'lotus-miner storage find [sector num]'
#
# type: bool
# env var: LOTUS_STORAGE_DISALLOWREMOTEFINALIZE
#DisallowRemoteFinalize = false

# ResourceFiltering instructs the system which resource filtering strategy
# to use when evaluating tasks against this worker. An empty value defaults
# to "hardware".
Expand Down
29 changes: 24 additions & 5 deletions extern/sector-storage/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,8 @@ type Manager struct {
workLk sync.Mutex
work *statestore.StateStore

parallelCheckLimit int
parallelCheckLimit int
disallowRemoteFinalize bool

callToWork map[storiface.CallID]WorkID
// used when we get an early return and there's no callToWork mapping
Expand Down Expand Up @@ -123,6 +124,8 @@ type Config struct {
// PoSt config
ParallelCheckLimit int

DisallowRemoteFinalize bool

Assigner string
}

Expand Down Expand Up @@ -155,7 +158,8 @@ func New(ctx context.Context, lstor *stores.Local, stor stores.Store, ls stores.

localProver: prover,

parallelCheckLimit: sc.ParallelCheckLimit,
parallelCheckLimit: sc.ParallelCheckLimit,
disallowRemoteFinalize: sc.DisallowRemoteFinalize,

work: mss,
callToWork: map[storiface.CallID]WorkID{},
Expand Down Expand Up @@ -589,6 +593,7 @@ func (m *Manager) FinalizeSector(ctx context.Context, sector storage.SectorRef,
return xerrors.Errorf("acquiring sector lock: %w", err)
}

// first check if the unsealed file exists anywhere; If it doesn't ignore it
unsealed := storiface.FTUnsealed
{
unsealedStores, err := m.index.StorageFindSector(ctx, sector.ID, storiface.FTUnsealed, 0, false)
Expand All @@ -601,6 +606,8 @@ func (m *Manager) FinalizeSector(ctx context.Context, sector storage.SectorRef,
}
}

// Make sure that the sealed file is still in sealing storage; In case it already
// isn't, we want to do finalize in long-term storage
pathType := storiface.PathStorage
{
sealedStores, err := m.index.StorageFindSector(ctx, sector.ID, storiface.FTSealed, 0, false)
Expand All @@ -616,6 +623,8 @@ func (m *Manager) FinalizeSector(ctx context.Context, sector storage.SectorRef,
}
}

// do the cache trimming wherever the likely still very large cache lives.
// we really don't want to move it.
selector := newExistingSelector(m.index, sector.ID, storiface.FTCache, false)

err := m.sched.Schedule(ctx, sector, sealtasks.TTFinalize, selector,
Expand All @@ -628,14 +637,18 @@ func (m *Manager) FinalizeSector(ctx context.Context, sector storage.SectorRef,
return err
}

fetchSel := newAllocSelector(m.index, storiface.FTCache|storiface.FTSealed, storiface.PathStorage)
// get a selector for moving stuff into long-term storage
fetchSel := newMoveSelector(m.index, sector.ID, storiface.FTCache|storiface.FTSealed, storiface.PathStorage, !m.disallowRemoteFinalize)

// only move the unsealed file if it still exists and needs moving
moveUnsealed := unsealed
{
if len(keepUnsealed) == 0 {
moveUnsealed = storiface.FTNone
}
}

// move stuff to long-term storage
err = m.sched.Schedule(ctx, sector, sealtasks.TTFetch, fetchSel,
m.schedFetch(sector, storiface.FTCache|storiface.FTSealed|moveUnsealed, storiface.PathStorage, storiface.AcquireMove),
func(ctx context.Context, w Worker) error {
Expand All @@ -657,6 +670,7 @@ func (m *Manager) FinalizeReplicaUpdate(ctx context.Context, sector storage.Sect
return xerrors.Errorf("acquiring sector lock: %w", err)
}

// first check if the unsealed file exists anywhere; If it doesn't ignore it
moveUnsealed := storiface.FTUnsealed
{
unsealedStores, err := m.index.StorageFindSector(ctx, sector.ID, storiface.FTUnsealed, 0, false)
Expand All @@ -669,6 +683,8 @@ func (m *Manager) FinalizeReplicaUpdate(ctx context.Context, sector storage.Sect
}
}

// Make sure that the update file is still in sealing storage; In case it already
// isn't, we want to do finalize in long-term storage
pathType := storiface.PathStorage
{
sealedStores, err := m.index.StorageFindSector(ctx, sector.ID, storiface.FTUpdate, 0, false)
Expand All @@ -684,7 +700,9 @@ func (m *Manager) FinalizeReplicaUpdate(ctx context.Context, sector storage.Sect
}
}

selector := newExistingSelector(m.index, sector.ID, storiface.FTCache|storiface.FTUpdateCache, false)
// do the cache trimming wherever the likely still large cache lives.
// we really don't want to move it.
selector := newExistingSelector(m.index, sector.ID, storiface.FTUpdateCache, false)

err := m.sched.Schedule(ctx, sector, sealtasks.TTFinalizeReplicaUpdate, selector,
m.schedFetch(sector, storiface.FTCache|storiface.FTUpdateCache|moveUnsealed, pathType, storiface.AcquireMove),
Expand All @@ -697,7 +715,8 @@ func (m *Manager) FinalizeReplicaUpdate(ctx context.Context, sector storage.Sect
}

move := func(types storiface.SectorFileType) error {
fetchSel := newAllocSelector(m.index, types, storiface.PathStorage)
// get a selector for moving stuff into long-term storage
fetchSel := newMoveSelector(m.index, sector.ID, types, storiface.PathStorage, !m.disallowRemoteFinalize)
{
if len(keepUnsealed) == 0 {
moveUnsealed = storiface.FTNone
Expand Down
4 changes: 3 additions & 1 deletion extern/sector-storage/sched.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,9 @@ const mib = 1 << 20
type WorkerAction func(ctx context.Context, w Worker) error

type WorkerSelector interface {
Ok(ctx context.Context, task sealtasks.TaskType, spt abi.RegisteredSealProof, a *WorkerHandle) (bool, error) // true if worker is acceptable for performing a task
// Ok is true if worker is acceptable for performing a task.
// If any worker is preferred for a task, other workers won't be considered for that task.
Ok(ctx context.Context, task sealtasks.TaskType, spt abi.RegisteredSealProof, a *WorkerHandle) (ok, preferred bool, err error)

Cmp(ctx context.Context, task sealtasks.TaskType, a, b *WorkerHandle) (bool, error) // true if a is preferred over b
}
Expand Down
17 changes: 15 additions & 2 deletions extern/sector-storage/sched_assigner_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,10 @@ func (a *AssignerCommon) TrySched(sh *Scheduler) {
}()

task := (*sh.SchedQueue)[sqi]

task.IndexHeap = sqi

var havePreferred bool

for wnd, windowRequest := range sh.OpenWindows {
worker, ok := sh.Workers[windowRequest.Worker]
if !ok {
Expand All @@ -84,7 +86,7 @@ func (a *AssignerCommon) TrySched(sh *Scheduler) {
}

rpcCtx, cancel := context.WithTimeout(task.Ctx, SelectorTimeout)
ok, err := task.Sel.Ok(rpcCtx, task.TaskType, task.Sector.ProofType, worker)
ok, preferred, err := task.Sel.Ok(rpcCtx, task.TaskType, task.Sector.ProofType, worker)
cancel()
if err != nil {
log.Errorf("trySched(1) req.Sel.Ok error: %+v", err)
Expand All @@ -95,6 +97,17 @@ func (a *AssignerCommon) TrySched(sh *Scheduler) {
continue
}

if havePreferred && !preferred {
// we have a way better worker for this task
continue
}

if preferred && !havePreferred {
// all workers we considered previously are much worse choice
acceptableWindows[sqi] = acceptableWindows[sqi][:0]
havePreferred = true
}

acceptableWindows[sqi] = append(acceptableWindows[sqi], wnd)
}

Expand Down
4 changes: 2 additions & 2 deletions extern/sector-storage/sched_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -584,9 +584,9 @@ func TestSched(t *testing.T) {

type slowishSelector bool

func (s slowishSelector) Ok(ctx context.Context, task sealtasks.TaskType, spt abi.RegisteredSealProof, a *WorkerHandle) (bool, error) {
func (s slowishSelector) Ok(ctx context.Context, task sealtasks.TaskType, spt abi.RegisteredSealProof, a *WorkerHandle) (bool, bool, error) {
time.Sleep(200 * time.Microsecond)
return bool(s), nil
return bool(s), false, nil
}

func (s slowishSelector) Cmp(ctx context.Context, task sealtasks.TaskType, a, b *WorkerHandle) (bool, error) {
Expand Down
16 changes: 8 additions & 8 deletions extern/sector-storage/selector_alloc.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,18 +26,18 @@ func newAllocSelector(index stores.SectorIndex, alloc storiface.SectorFileType,
}
}

func (s *allocSelector) Ok(ctx context.Context, task sealtasks.TaskType, spt abi.RegisteredSealProof, whnd *WorkerHandle) (bool, error) {
func (s *allocSelector) Ok(ctx context.Context, task sealtasks.TaskType, spt abi.RegisteredSealProof, whnd *WorkerHandle) (bool, bool, error) {
tasks, err := whnd.TaskTypes(ctx)
if err != nil {
return false, xerrors.Errorf("getting supported worker task types: %w", err)
return false, false, xerrors.Errorf("getting supported worker task types: %w", err)
}
if _, supported := tasks[task]; !supported {
return false, nil
return false, false, nil
}

paths, err := whnd.workerRpc.Paths(ctx)
if err != nil {
return false, xerrors.Errorf("getting worker paths: %w", err)
return false, false, xerrors.Errorf("getting worker paths: %w", err)
}

have := map[storiface.ID]struct{}{}
Expand All @@ -47,21 +47,21 @@ func (s *allocSelector) Ok(ctx context.Context, task sealtasks.TaskType, spt abi

ssize, err := spt.SectorSize()
if err != nil {
return false, xerrors.Errorf("getting sector size: %w", err)
return false, false, xerrors.Errorf("getting sector size: %w", err)
}

best, err := s.index.StorageBestAlloc(ctx, s.alloc, ssize, s.ptype)
if err != nil {
return false, xerrors.Errorf("finding best alloc storage: %w", err)
return false, false, xerrors.Errorf("finding best alloc storage: %w", err)
}

for _, info := range best {
if _, ok := have[info.ID]; ok {
return true, nil
return true, false, nil
}
}

return false, nil
return false, false, nil
}

func (s *allocSelector) Cmp(ctx context.Context, task sealtasks.TaskType, a, b *WorkerHandle) (bool, error) {
Expand Down
16 changes: 8 additions & 8 deletions extern/sector-storage/selector_existing.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,18 +28,18 @@ func newExistingSelector(index stores.SectorIndex, sector abi.SectorID, alloc st
}
}

func (s *existingSelector) Ok(ctx context.Context, task sealtasks.TaskType, spt abi.RegisteredSealProof, whnd *WorkerHandle) (bool, error) {
func (s *existingSelector) Ok(ctx context.Context, task sealtasks.TaskType, spt abi.RegisteredSealProof, whnd *WorkerHandle) (bool, bool, error) {
tasks, err := whnd.TaskTypes(ctx)
if err != nil {
return false, xerrors.Errorf("getting supported worker task types: %w", err)
return false, false, xerrors.Errorf("getting supported worker task types: %w", err)
}
if _, supported := tasks[task]; !supported {
return false, nil
return false, false, nil
}

paths, err := whnd.workerRpc.Paths(ctx)
if err != nil {
return false, xerrors.Errorf("getting worker paths: %w", err)
return false, false, xerrors.Errorf("getting worker paths: %w", err)
}

have := map[storiface.ID]struct{}{}
Expand All @@ -49,21 +49,21 @@ func (s *existingSelector) Ok(ctx context.Context, task sealtasks.TaskType, spt

ssize, err := spt.SectorSize()
if err != nil {
return false, xerrors.Errorf("getting sector size: %w", err)
return false, false, xerrors.Errorf("getting sector size: %w", err)
}

best, err := s.index.StorageFindSector(ctx, s.sector, s.alloc, ssize, s.allowFetch)
if err != nil {
return false, xerrors.Errorf("finding best storage: %w", err)
return false, false, xerrors.Errorf("finding best storage: %w", err)
}

for _, info := range best {
if _, ok := have[info.ID]; ok {
return true, nil
return true, false, nil
}
}

return false, nil
return false, false, nil
}

func (s *existingSelector) Cmp(ctx context.Context, task sealtasks.TaskType, a, b *WorkerHandle) (bool, error) {
Expand Down
98 changes: 98 additions & 0 deletions extern/sector-storage/selector_move.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
package sectorstorage

import (
"context"

"golang.org/x/xerrors"

"github.com/filecoin-project/go-state-types/abi"

"github.com/filecoin-project/lotus/extern/sector-storage/sealtasks"
"github.com/filecoin-project/lotus/extern/sector-storage/stores"
"github.com/filecoin-project/lotus/extern/sector-storage/storiface"
)

type moveSelector struct {
index stores.SectorIndex
sector abi.SectorID
alloc storiface.SectorFileType
destPtype storiface.PathType
allowRemote bool
}

func newMoveSelector(index stores.SectorIndex, sector abi.SectorID, alloc storiface.SectorFileType, destPtype storiface.PathType, allowRemote bool) *moveSelector {
return &moveSelector{
index: index,
sector: sector,
alloc: alloc,
destPtype: destPtype,
allowRemote: allowRemote,
}
}

func (s *moveSelector) Ok(ctx context.Context, task sealtasks.TaskType, spt abi.RegisteredSealProof, whnd *WorkerHandle) (bool, bool, error) {
tasks, err := whnd.TaskTypes(ctx)
if err != nil {
return false, false, xerrors.Errorf("getting supported worker task types: %w", err)
}
if _, supported := tasks[task]; !supported {
return false, false, nil
}

paths, err := whnd.workerRpc.Paths(ctx)
if err != nil {
return false, false, xerrors.Errorf("getting worker paths: %w", err)
}

workerPaths := map[storiface.ID]int{}
for _, path := range paths {
workerPaths[path.ID] = 0
}

ssize, err := spt.SectorSize()
if err != nil {
return false, false, xerrors.Errorf("getting sector size: %w", err)
}

// note: allowFetch is always false here, because we want to find workers with
// the sector available locally
preferred, err := s.index.StorageFindSector(ctx, s.sector, s.alloc, ssize, false)
if err != nil {
return false, false, xerrors.Errorf("finding preferred storage: %w", err)
}

for _, info := range preferred {
if _, ok := workerPaths[info.ID]; ok {
workerPaths[info.ID]++
}
}

best, err := s.index.StorageBestAlloc(ctx, s.alloc, ssize, s.destPtype)
if err != nil {
return false, false, xerrors.Errorf("finding best dest storage: %w", err)
}

var ok bool

for _, info := range best {
if n, has := workerPaths[info.ID]; has {
ok = true

// if the worker has a local path with the sector already in it
// prefer that worker; This usually meant that the move operation is
// either a no-op because the sector is already in the correct path,
// or the move a local move.
if n > 0 {
return true, true, nil
}
}
}

return ok && s.allowRemote, false, nil
}

func (s *moveSelector) Cmp(ctx context.Context, task sealtasks.TaskType, a, b *WorkerHandle) (bool, error) {
return a.Utilization() < b.Utilization(), nil
}

var _ WorkerSelector = &moveSelector{}
Loading