Skip to content

Commit

Permalink
Merge pull request #8710 from filecoin-project/feat/stor-fin-move-sel…
Browse files Browse the repository at this point in the history
…ector

feat: sched: Finalize* move selectors
  • Loading branch information
magik6k authored May 26, 2022
2 parents cfff877 + 70f3b98 commit 7836e20
Show file tree
Hide file tree
Showing 15 changed files with 241 additions and 37 deletions.
16 changes: 16 additions & 0 deletions documentation/en/default-lotus-miner-config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -542,6 +542,22 @@
# env var: LOTUS_STORAGE_ASSIGNER
#Assigner = "utilization"

# DisallowRemoteFinalize when set to true will force all Finalize tasks to
# run on workers with local access to both long-term storage and the sealing
# path containing the sector.
# --
# WARNING: Only set this if all workers have access to long-term storage
# paths. If this flag is enabled, and there are workers without long-term
# storage access, sectors will not be moved from them, and Finalize tasks
# will appear to be stuck.
# --
# If you see stuck Finalize tasks after enabling this setting, check
# 'lotus-miner sealing sched-diag' and 'lotus-miner storage find [sector num]'
#
# type: bool
# env var: LOTUS_STORAGE_DISALLOWREMOTEFINALIZE
#DisallowRemoteFinalize = false

# ResourceFiltering instructs the system which resource filtering strategy
# to use when evaluating tasks against this worker. An empty value defaults
# to "hardware".
Expand Down
29 changes: 24 additions & 5 deletions extern/sector-storage/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,8 @@ type Manager struct {
workLk sync.Mutex
work *statestore.StateStore

parallelCheckLimit int
parallelCheckLimit int
disallowRemoteFinalize bool

callToWork map[storiface.CallID]WorkID
// used when we get an early return and there's no callToWork mapping
Expand Down Expand Up @@ -123,6 +124,8 @@ type Config struct {
// PoSt config
ParallelCheckLimit int

DisallowRemoteFinalize bool

Assigner string
}

Expand Down Expand Up @@ -155,7 +158,8 @@ func New(ctx context.Context, lstor *stores.Local, stor stores.Store, ls stores.

localProver: prover,

parallelCheckLimit: sc.ParallelCheckLimit,
parallelCheckLimit: sc.ParallelCheckLimit,
disallowRemoteFinalize: sc.DisallowRemoteFinalize,

work: mss,
callToWork: map[storiface.CallID]WorkID{},
Expand Down Expand Up @@ -592,6 +596,7 @@ func (m *Manager) FinalizeSector(ctx context.Context, sector storage.SectorRef,
return xerrors.Errorf("acquiring sector lock: %w", err)
}

// first check if the unsealed file exists anywhere; If it doesn't ignore it
unsealed := storiface.FTUnsealed
{
unsealedStores, err := m.index.StorageFindSector(ctx, sector.ID, storiface.FTUnsealed, 0, false)
Expand All @@ -604,6 +609,8 @@ func (m *Manager) FinalizeSector(ctx context.Context, sector storage.SectorRef,
}
}

// Make sure that the sealed file is still in sealing storage; In case it already
// isn't, we want to do finalize in long-term storage
pathType := storiface.PathStorage
{
sealedStores, err := m.index.StorageFindSector(ctx, sector.ID, storiface.FTSealed, 0, false)
Expand All @@ -619,6 +626,8 @@ func (m *Manager) FinalizeSector(ctx context.Context, sector storage.SectorRef,
}
}

// do the cache trimming wherever the likely still very large cache lives.
// we really don't want to move it.
selector := newExistingSelector(m.index, sector.ID, storiface.FTCache, false)

err := m.sched.Schedule(ctx, sector, sealtasks.TTFinalize, selector,
Expand All @@ -631,14 +640,18 @@ func (m *Manager) FinalizeSector(ctx context.Context, sector storage.SectorRef,
return err
}

fetchSel := newAllocSelector(m.index, storiface.FTCache|storiface.FTSealed, storiface.PathStorage)
// get a selector for moving stuff into long-term storage
fetchSel := newMoveSelector(m.index, sector.ID, storiface.FTCache|storiface.FTSealed, storiface.PathStorage, !m.disallowRemoteFinalize)

// only move the unsealed file if it still exists and needs moving
moveUnsealed := unsealed
{
if len(keepUnsealed) == 0 {
moveUnsealed = storiface.FTNone
}
}

// move stuff to long-term storage
err = m.sched.Schedule(ctx, sector, sealtasks.TTFetch, fetchSel,
m.schedFetch(sector, storiface.FTCache|storiface.FTSealed|moveUnsealed, storiface.PathStorage, storiface.AcquireMove),
func(ctx context.Context, w Worker) error {
Expand All @@ -660,6 +673,7 @@ func (m *Manager) FinalizeReplicaUpdate(ctx context.Context, sector storage.Sect
return xerrors.Errorf("acquiring sector lock: %w", err)
}

// first check if the unsealed file exists anywhere; If it doesn't ignore it
moveUnsealed := storiface.FTUnsealed
{
unsealedStores, err := m.index.StorageFindSector(ctx, sector.ID, storiface.FTUnsealed, 0, false)
Expand All @@ -672,6 +686,8 @@ func (m *Manager) FinalizeReplicaUpdate(ctx context.Context, sector storage.Sect
}
}

// Make sure that the update file is still in sealing storage; In case it already
// isn't, we want to do finalize in long-term storage
pathType := storiface.PathStorage
{
sealedStores, err := m.index.StorageFindSector(ctx, sector.ID, storiface.FTUpdate, 0, false)
Expand All @@ -687,7 +703,9 @@ func (m *Manager) FinalizeReplicaUpdate(ctx context.Context, sector storage.Sect
}
}

selector := newExistingSelector(m.index, sector.ID, storiface.FTCache|storiface.FTUpdateCache, false)
// do the cache trimming wherever the likely still large cache lives.
// we really don't want to move it.
selector := newExistingSelector(m.index, sector.ID, storiface.FTUpdateCache, false)

err := m.sched.Schedule(ctx, sector, sealtasks.TTFinalizeReplicaUpdate, selector,
m.schedFetch(sector, storiface.FTCache|storiface.FTUpdateCache|moveUnsealed, pathType, storiface.AcquireMove),
Expand All @@ -700,7 +718,8 @@ func (m *Manager) FinalizeReplicaUpdate(ctx context.Context, sector storage.Sect
}

move := func(types storiface.SectorFileType) error {
fetchSel := newAllocSelector(m.index, types, storiface.PathStorage)
// get a selector for moving stuff into long-term storage
fetchSel := newMoveSelector(m.index, sector.ID, types, storiface.PathStorage, !m.disallowRemoteFinalize)
{
if len(keepUnsealed) == 0 {
moveUnsealed = storiface.FTNone
Expand Down
4 changes: 3 additions & 1 deletion extern/sector-storage/sched.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,9 @@ const mib = 1 << 20
type WorkerAction func(ctx context.Context, w Worker) error

type WorkerSelector interface {
Ok(ctx context.Context, task sealtasks.TaskType, spt abi.RegisteredSealProof, a *WorkerHandle) (bool, error) // true if worker is acceptable for performing a task
// Ok is true if worker is acceptable for performing a task.
// If any worker is preferred for a task, other workers won't be considered for that task.
Ok(ctx context.Context, task sealtasks.TaskType, spt abi.RegisteredSealProof, a *WorkerHandle) (ok, preferred bool, err error)

Cmp(ctx context.Context, task sealtasks.TaskType, a, b *WorkerHandle) (bool, error) // true if a is preferred over b
}
Expand Down
17 changes: 15 additions & 2 deletions extern/sector-storage/sched_assigner_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,10 @@ func (a *AssignerCommon) TrySched(sh *Scheduler) {
}()

task := (*sh.SchedQueue)[sqi]

task.IndexHeap = sqi

var havePreferred bool

for wnd, windowRequest := range sh.OpenWindows {
worker, ok := sh.Workers[windowRequest.Worker]
if !ok {
Expand All @@ -84,7 +86,7 @@ func (a *AssignerCommon) TrySched(sh *Scheduler) {
}

rpcCtx, cancel := context.WithTimeout(task.Ctx, SelectorTimeout)
ok, err := task.Sel.Ok(rpcCtx, task.TaskType, task.Sector.ProofType, worker)
ok, preferred, err := task.Sel.Ok(rpcCtx, task.TaskType, task.Sector.ProofType, worker)
cancel()
if err != nil {
log.Errorf("trySched(1) req.Sel.Ok error: %+v", err)
Expand All @@ -95,6 +97,17 @@ func (a *AssignerCommon) TrySched(sh *Scheduler) {
continue
}

if havePreferred && !preferred {
// we have a way better worker for this task
continue
}

if preferred && !havePreferred {
// all workers we considered previously are much worse choice
acceptableWindows[sqi] = acceptableWindows[sqi][:0]
havePreferred = true
}

acceptableWindows[sqi] = append(acceptableWindows[sqi], wnd)
}

Expand Down
4 changes: 2 additions & 2 deletions extern/sector-storage/sched_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -584,9 +584,9 @@ func TestSched(t *testing.T) {

type slowishSelector bool

func (s slowishSelector) Ok(ctx context.Context, task sealtasks.TaskType, spt abi.RegisteredSealProof, a *WorkerHandle) (bool, error) {
func (s slowishSelector) Ok(ctx context.Context, task sealtasks.TaskType, spt abi.RegisteredSealProof, a *WorkerHandle) (bool, bool, error) {
time.Sleep(200 * time.Microsecond)
return bool(s), nil
return bool(s), false, nil
}

func (s slowishSelector) Cmp(ctx context.Context, task sealtasks.TaskType, a, b *WorkerHandle) (bool, error) {
Expand Down
16 changes: 8 additions & 8 deletions extern/sector-storage/selector_alloc.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,18 +26,18 @@ func newAllocSelector(index stores.SectorIndex, alloc storiface.SectorFileType,
}
}

func (s *allocSelector) Ok(ctx context.Context, task sealtasks.TaskType, spt abi.RegisteredSealProof, whnd *WorkerHandle) (bool, error) {
func (s *allocSelector) Ok(ctx context.Context, task sealtasks.TaskType, spt abi.RegisteredSealProof, whnd *WorkerHandle) (bool, bool, error) {
tasks, err := whnd.TaskTypes(ctx)
if err != nil {
return false, xerrors.Errorf("getting supported worker task types: %w", err)
return false, false, xerrors.Errorf("getting supported worker task types: %w", err)
}
if _, supported := tasks[task]; !supported {
return false, nil
return false, false, nil
}

paths, err := whnd.workerRpc.Paths(ctx)
if err != nil {
return false, xerrors.Errorf("getting worker paths: %w", err)
return false, false, xerrors.Errorf("getting worker paths: %w", err)
}

have := map[storiface.ID]struct{}{}
Expand All @@ -47,21 +47,21 @@ func (s *allocSelector) Ok(ctx context.Context, task sealtasks.TaskType, spt abi

ssize, err := spt.SectorSize()
if err != nil {
return false, xerrors.Errorf("getting sector size: %w", err)
return false, false, xerrors.Errorf("getting sector size: %w", err)
}

best, err := s.index.StorageBestAlloc(ctx, s.alloc, ssize, s.ptype)
if err != nil {
return false, xerrors.Errorf("finding best alloc storage: %w", err)
return false, false, xerrors.Errorf("finding best alloc storage: %w", err)
}

for _, info := range best {
if _, ok := have[info.ID]; ok {
return true, nil
return true, false, nil
}
}

return false, nil
return false, false, nil
}

func (s *allocSelector) Cmp(ctx context.Context, task sealtasks.TaskType, a, b *WorkerHandle) (bool, error) {
Expand Down
16 changes: 8 additions & 8 deletions extern/sector-storage/selector_existing.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,18 +28,18 @@ func newExistingSelector(index stores.SectorIndex, sector abi.SectorID, alloc st
}
}

func (s *existingSelector) Ok(ctx context.Context, task sealtasks.TaskType, spt abi.RegisteredSealProof, whnd *WorkerHandle) (bool, error) {
func (s *existingSelector) Ok(ctx context.Context, task sealtasks.TaskType, spt abi.RegisteredSealProof, whnd *WorkerHandle) (bool, bool, error) {
tasks, err := whnd.TaskTypes(ctx)
if err != nil {
return false, xerrors.Errorf("getting supported worker task types: %w", err)
return false, false, xerrors.Errorf("getting supported worker task types: %w", err)
}
if _, supported := tasks[task]; !supported {
return false, nil
return false, false, nil
}

paths, err := whnd.workerRpc.Paths(ctx)
if err != nil {
return false, xerrors.Errorf("getting worker paths: %w", err)
return false, false, xerrors.Errorf("getting worker paths: %w", err)
}

have := map[storiface.ID]struct{}{}
Expand All @@ -49,21 +49,21 @@ func (s *existingSelector) Ok(ctx context.Context, task sealtasks.TaskType, spt

ssize, err := spt.SectorSize()
if err != nil {
return false, xerrors.Errorf("getting sector size: %w", err)
return false, false, xerrors.Errorf("getting sector size: %w", err)
}

best, err := s.index.StorageFindSector(ctx, s.sector, s.alloc, ssize, s.allowFetch)
if err != nil {
return false, xerrors.Errorf("finding best storage: %w", err)
return false, false, xerrors.Errorf("finding best storage: %w", err)
}

for _, info := range best {
if _, ok := have[info.ID]; ok {
return true, nil
return true, false, nil
}
}

return false, nil
return false, false, nil
}

func (s *existingSelector) Cmp(ctx context.Context, task sealtasks.TaskType, a, b *WorkerHandle) (bool, error) {
Expand Down
98 changes: 98 additions & 0 deletions extern/sector-storage/selector_move.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
package sectorstorage

import (
"context"

"golang.org/x/xerrors"

"github.com/filecoin-project/go-state-types/abi"

"github.com/filecoin-project/lotus/extern/sector-storage/sealtasks"
"github.com/filecoin-project/lotus/extern/sector-storage/stores"
"github.com/filecoin-project/lotus/extern/sector-storage/storiface"
)

type moveSelector struct {
index stores.SectorIndex
sector abi.SectorID
alloc storiface.SectorFileType
destPtype storiface.PathType
allowRemote bool
}

func newMoveSelector(index stores.SectorIndex, sector abi.SectorID, alloc storiface.SectorFileType, destPtype storiface.PathType, allowRemote bool) *moveSelector {
return &moveSelector{
index: index,
sector: sector,
alloc: alloc,
destPtype: destPtype,
allowRemote: allowRemote,
}
}

func (s *moveSelector) Ok(ctx context.Context, task sealtasks.TaskType, spt abi.RegisteredSealProof, whnd *WorkerHandle) (bool, bool, error) {
tasks, err := whnd.TaskTypes(ctx)
if err != nil {
return false, false, xerrors.Errorf("getting supported worker task types: %w", err)
}
if _, supported := tasks[task]; !supported {
return false, false, nil
}

paths, err := whnd.workerRpc.Paths(ctx)
if err != nil {
return false, false, xerrors.Errorf("getting worker paths: %w", err)
}

workerPaths := map[storiface.ID]int{}
for _, path := range paths {
workerPaths[path.ID] = 0
}

ssize, err := spt.SectorSize()
if err != nil {
return false, false, xerrors.Errorf("getting sector size: %w", err)
}

// note: allowFetch is always false here, because we want to find workers with
// the sector available locally
preferred, err := s.index.StorageFindSector(ctx, s.sector, s.alloc, ssize, false)
if err != nil {
return false, false, xerrors.Errorf("finding preferred storage: %w", err)
}

for _, info := range preferred {
if _, ok := workerPaths[info.ID]; ok {
workerPaths[info.ID]++
}
}

best, err := s.index.StorageBestAlloc(ctx, s.alloc, ssize, s.destPtype)
if err != nil {
return false, false, xerrors.Errorf("finding best dest storage: %w", err)
}

var ok bool

for _, info := range best {
if n, has := workerPaths[info.ID]; has {
ok = true

// if the worker has a local path with the sector already in it
// prefer that worker; This usually meant that the move operation is
// either a no-op because the sector is already in the correct path,
// or the move a local move.
if n > 0 {
return true, true, nil
}
}
}

return ok && s.allowRemote, false, nil
}

func (s *moveSelector) Cmp(ctx context.Context, task sealtasks.TaskType, a, b *WorkerHandle) (bool, error) {
return a.Utilization() < b.Utilization(), nil
}

var _ WorkerSelector = &moveSelector{}
Loading

0 comments on commit 7836e20

Please sign in to comment.