diff --git a/extern/sector-storage/manager.go b/extern/sector-storage/manager.go index 01600cc216c..fc8589bbf6a 100644 --- a/extern/sector-storage/manager.go +++ b/extern/sector-storage/manager.go @@ -589,6 +589,7 @@ func (m *Manager) FinalizeSector(ctx context.Context, sector storage.SectorRef, return xerrors.Errorf("acquiring sector lock: %w", err) } + // first check if the unsealed file exists anywhere; If it doesn't ignore it unsealed := storiface.FTUnsealed { unsealedStores, err := m.index.StorageFindSector(ctx, sector.ID, storiface.FTUnsealed, 0, false) @@ -601,6 +602,8 @@ func (m *Manager) FinalizeSector(ctx context.Context, sector storage.SectorRef, } } + // Make sure that the sealed file is still in sealing storage; In case it already + // isn't, we want to do finalize in long-term storage pathType := storiface.PathStorage { sealedStores, err := m.index.StorageFindSector(ctx, sector.ID, storiface.FTSealed, 0, false) @@ -616,6 +619,8 @@ func (m *Manager) FinalizeSector(ctx context.Context, sector storage.SectorRef, } } + // do the cache trimming wherever the likely still very large cache lives. + // we really don't want to move it. selector := newExistingSelector(m.index, sector.ID, storiface.FTCache, false) err := m.sched.Schedule(ctx, sector, sealtasks.TTFinalize, selector, @@ -628,7 +633,10 @@ func (m *Manager) FinalizeSector(ctx context.Context, sector storage.SectorRef, return err } - fetchSel := newAllocSelector(m.index, storiface.FTCache|storiface.FTSealed, storiface.PathStorage) + // get a selector for moving stuff into long-term storage + fetchSel := newMoveSelector(m.index, sector.ID, storiface.FTCache|storiface.FTSealed, storiface.PathStorage) + + // only move the unsealed file if it still exists and needs moving moveUnsealed := unsealed { if len(keepUnsealed) == 0 { @@ -636,6 +644,7 @@ func (m *Manager) FinalizeSector(ctx context.Context, sector storage.SectorRef, } } + // move stuff to long-term storage err = m.sched.Schedule(ctx, sector, sealtasks.TTFetch, fetchSel, m.schedFetch(sector, storiface.FTCache|storiface.FTSealed|moveUnsealed, storiface.PathStorage, storiface.AcquireMove), func(ctx context.Context, w Worker) error { @@ -657,6 +666,7 @@ func (m *Manager) FinalizeReplicaUpdate(ctx context.Context, sector storage.Sect return xerrors.Errorf("acquiring sector lock: %w", err) } + // first check if the unsealed file exists anywhere; If it doesn't ignore it moveUnsealed := storiface.FTUnsealed { unsealedStores, err := m.index.StorageFindSector(ctx, sector.ID, storiface.FTUnsealed, 0, false) @@ -669,6 +679,8 @@ func (m *Manager) FinalizeReplicaUpdate(ctx context.Context, sector storage.Sect } } + // Make sure that the update file is still in sealing storage; In case it already + // isn't, we want to do finalize in long-term storage pathType := storiface.PathStorage { sealedStores, err := m.index.StorageFindSector(ctx, sector.ID, storiface.FTUpdate, 0, false) @@ -684,7 +696,9 @@ func (m *Manager) FinalizeReplicaUpdate(ctx context.Context, sector storage.Sect } } - selector := newExistingSelector(m.index, sector.ID, storiface.FTCache|storiface.FTUpdateCache, false) + // do the cache trimming wherever the likely still large cache lives. + // we really don't want to move it. + selector := newExistingSelector(m.index, sector.ID, storiface.FTUpdateCache, false) err := m.sched.Schedule(ctx, sector, sealtasks.TTFinalizeReplicaUpdate, selector, m.schedFetch(sector, storiface.FTCache|storiface.FTUpdateCache|moveUnsealed, pathType, storiface.AcquireMove), @@ -697,7 +711,8 @@ func (m *Manager) FinalizeReplicaUpdate(ctx context.Context, sector storage.Sect } move := func(types storiface.SectorFileType) error { - fetchSel := newAllocSelector(m.index, types, storiface.PathStorage) + // get a selector for moving stuff into long-term storage + fetchSel := newMoveSelector(m.index, sector.ID, types, storiface.PathStorage) { if len(keepUnsealed) == 0 { moveUnsealed = storiface.FTNone diff --git a/extern/sector-storage/sched_test.go b/extern/sector-storage/sched_test.go index 312503a9c1b..c5a8b98fd79 100644 --- a/extern/sector-storage/sched_test.go +++ b/extern/sector-storage/sched_test.go @@ -584,9 +584,9 @@ func TestSched(t *testing.T) { type slowishSelector bool -func (s slowishSelector) Ok(ctx context.Context, task sealtasks.TaskType, spt abi.RegisteredSealProof, a *WorkerHandle) (bool, error) { +func (s slowishSelector) Ok(ctx context.Context, task sealtasks.TaskType, spt abi.RegisteredSealProof, a *WorkerHandle) (bool, bool, error) { time.Sleep(200 * time.Microsecond) - return bool(s), nil + return bool(s), false, nil } func (s slowishSelector) Cmp(ctx context.Context, task sealtasks.TaskType, a, b *WorkerHandle) (bool, error) { diff --git a/extern/sector-storage/selector_move.go b/extern/sector-storage/selector_move.go new file mode 100644 index 00000000000..1fb4c9457f7 --- /dev/null +++ b/extern/sector-storage/selector_move.go @@ -0,0 +1,96 @@ +package sectorstorage + +import ( + "context" + + "golang.org/x/xerrors" + + "github.com/filecoin-project/go-state-types/abi" + + "github.com/filecoin-project/lotus/extern/sector-storage/sealtasks" + "github.com/filecoin-project/lotus/extern/sector-storage/stores" + "github.com/filecoin-project/lotus/extern/sector-storage/storiface" +) + +type moveSelector struct { + index stores.SectorIndex + sector abi.SectorID + alloc storiface.SectorFileType + destPtype storiface.PathType +} + +func newMoveSelector(index stores.SectorIndex, sector abi.SectorID, alloc storiface.SectorFileType, destPtype storiface.PathType) *moveSelector { + return &moveSelector{ + index: index, + sector: sector, + alloc: alloc, + destPtype: destPtype, + } +} + +func (s *moveSelector) Ok(ctx context.Context, task sealtasks.TaskType, spt abi.RegisteredSealProof, whnd *WorkerHandle) (bool, bool, error) { + tasks, err := whnd.TaskTypes(ctx) + if err != nil { + return false, false, xerrors.Errorf("getting supported worker task types: %w", err) + } + if _, supported := tasks[task]; !supported { + return false, false, nil + } + + paths, err := whnd.workerRpc.Paths(ctx) + if err != nil { + return false, false, xerrors.Errorf("getting worker paths: %w", err) + } + + workerPaths := map[storiface.ID]int{} + for _, path := range paths { + workerPaths[path.ID] = 0 + } + + ssize, err := spt.SectorSize() + if err != nil { + return false, false, xerrors.Errorf("getting sector size: %w", err) + } + + // note: allowFetch is always false here, because we want to find workers with + // the sector available locally + preferred, err := s.index.StorageFindSector(ctx, s.sector, s.alloc, ssize, false) + if err != nil { + return false, false, xerrors.Errorf("finding preferred storage: %w", err) + } + + for _, info := range preferred { + if _, ok := workerPaths[info.ID]; ok { + workerPaths[info.ID]++ + } + } + + best, err := s.index.StorageBestAlloc(ctx, s.alloc, ssize, s.destPtype) + if err != nil { + return false, false, xerrors.Errorf("finding best dest storage: %w", err) + } + + var ok bool + + for _, info := range best { + if n, has := workerPaths[info.ID]; has { + ok = true + + // if the worker has a local path with the sector already in it + // prefer that worker; This usually meant that the move operation is + // either a no-op because the sector is already in the correct path, + // or the move a local move. + if n > 0 { + return true, true, nil + } + } + } + + return ok, false, nil +} + +func (s *moveSelector) Cmp(ctx context.Context, task sealtasks.TaskType, a, b *WorkerHandle) (bool, error) { + return a.Utilization() < b.Utilization(), nil +} + +var _ WorkerSelector = &moveSelector{} diff --git a/extern/sector-storage/selector_task.go b/extern/sector-storage/selector_task.go index 1be2c367769..d5ea9618bd2 100644 --- a/extern/sector-storage/selector_task.go +++ b/extern/sector-storage/selector_task.go @@ -19,14 +19,14 @@ func newTaskSelector() *taskSelector { return &taskSelector{} } -func (s *taskSelector) Ok(ctx context.Context, task sealtasks.TaskType, spt abi.RegisteredSealProof, whnd *WorkerHandle) (bool, error) { +func (s *taskSelector) Ok(ctx context.Context, task sealtasks.TaskType, spt abi.RegisteredSealProof, whnd *WorkerHandle) (bool, bool, error) { tasks, err := whnd.TaskTypes(ctx) if err != nil { - return false, xerrors.Errorf("getting supported worker task types: %w", err) + return false, false, xerrors.Errorf("getting supported worker task types: %w", err) } _, supported := tasks[task] - return supported, nil + return supported, false, nil } func (s *taskSelector) Cmp(ctx context.Context, _ sealtasks.TaskType, a, b *WorkerHandle) (bool, error) {