diff --git a/extern/sector-storage/worker_tracked.go b/extern/sector-storage/worker_tracked.go index 03ae292587a..2ebc5b620a2 100644 --- a/extern/sector-storage/worker_tracked.go +++ b/extern/sector-storage/worker_tracked.go @@ -5,6 +5,7 @@ import ( "sync" "time" + "github.com/google/uuid" "github.com/ipfs/go-cid" "go.opencensus.io/stats" "go.opencensus.io/tag" @@ -28,7 +29,7 @@ type workTracker struct { done map[storiface.CallID]struct{} running map[storiface.CallID]trackedWork - prepared map[storiface.CallID]trackedWork + prepared map[uuid.UUID]trackedWork // TODO: done, aggregate stats, queue stats, scheduler feedback } @@ -57,72 +58,67 @@ func (wt *workTracker) onDone(ctx context.Context, callID storiface.CallID) { delete(wt.running, callID) } -func (wt *workTracker) track(ctx context.Context, ready chan struct{}, wid WorkerID, wi storiface.WorkerInfo, sid storage.SectorRef, task sealtasks.TaskType) func(storiface.CallID, error) (storiface.CallID, error) { - return func(callID storiface.CallID, err error) (storiface.CallID, error) { - if err != nil { - return callID, err +func (wt *workTracker) track(ctx context.Context, ready chan struct{}, wid WorkerID, wi storiface.WorkerInfo, sid storage.SectorRef, task sealtasks.TaskType, cb func() (storiface.CallID, error)) (storiface.CallID, error) { + tracked := func(rw int, callID storiface.CallID) trackedWork { + return trackedWork{ + job: storiface.WorkerJob{ + ID: callID, + Sector: sid.ID, + Task: task, + Start: time.Now(), + RunWait: rw, + }, + worker: wid, + workerHostname: wi.Hostname, } + } - wt.lk.Lock() - defer wt.lk.Unlock() + select { + case <-ready: + case <-ctx.Done(): + return storiface.UndefCall, ctx.Err() + default: + prepID := uuid.New() - _, done := wt.done[callID] - if done { - delete(wt.done, callID) - return callID, err - } - - tracked := func(rw int) trackedWork { - return trackedWork{ - job: storiface.WorkerJob{ - ID: callID, - Sector: sid.ID, - Task: task, - Start: time.Now(), - RunWait: rw, - }, - worker: wid, - workerHostname: wi.Hostname, - } - } + wt.prepared[prepID] = tracked(storiface.RWPrepared, storiface.UndefCall) + wt.lk.Unlock() select { case <-ready: case <-ctx.Done(): - return callID, ctx.Err() - default: - wt.prepared[callID] = tracked(storiface.RWPrepared) - - wt.lk.Unlock() - select { - case <-ready: - case <-ctx.Done(): - delete(wt.prepared, callID) - wt.lk.Lock() // for the deferred unlock - return callID, ctx.Err() - } - wt.lk.Lock() - _, done := wt.done[callID] - if done { - delete(wt.done, callID) - return callID, err - } - - delete(wt.prepared, callID) + delete(wt.prepared, prepID) + return storiface.UndefCall, ctx.Err() } - wt.running[callID] = tracked(storiface.RWRunning) + wt.lk.Lock() + delete(wt.prepared, prepID) + } + + callID, err := cb() + if err != nil { + return callID, err + } - ctx, _ = tag.New( - ctx, - tag.Upsert(metrics.TaskType, string(task)), - tag.Upsert(metrics.WorkerHostname, wi.Hostname), - ) - stats.Record(ctx, metrics.WorkerCallsStarted.M(1)) + wt.lk.Lock() + defer wt.lk.Unlock() + _, done := wt.done[callID] + if done { + delete(wt.done, callID) return callID, err } + + wt.running[callID] = tracked(storiface.RWRunning, callID) + + ctx, _ = tag.New( + ctx, + tag.Upsert(metrics.TaskType, string(task)), + tag.Upsert(metrics.WorkerHostname, wi.Hostname), + ) + stats.Record(ctx, metrics.WorkerCallsStarted.M(1)) + + return callID, err } func (wt *workTracker) worker(wid WorkerID, wi storiface.WorkerInfo, w Worker) *trackedWorker { @@ -168,35 +164,37 @@ func (t *trackedWorker) start() { } func (t *trackedWorker) SealPreCommit1(ctx context.Context, sector storage.SectorRef, ticket abi.SealRandomness, pieces []abi.PieceInfo) (storiface.CallID, error) { - return t.tracker.track(ctx, t.execute, t.wid, t.workerInfo, sector, sealtasks.TTPreCommit1)(t.Worker.SealPreCommit1(ctx, sector, ticket, pieces)) + return t.tracker.track(ctx, t.execute, t.wid, t.workerInfo, sector, sealtasks.TTPreCommit1, func() (storiface.CallID, error) { return t.Worker.SealPreCommit1(ctx, sector, ticket, pieces) }) } func (t *trackedWorker) SealPreCommit2(ctx context.Context, sector storage.SectorRef, pc1o storage.PreCommit1Out) (storiface.CallID, error) { - return t.tracker.track(ctx, t.execute, t.wid, t.workerInfo, sector, sealtasks.TTPreCommit2)(t.Worker.SealPreCommit2(ctx, sector, pc1o)) + return t.tracker.track(ctx, t.execute, t.wid, t.workerInfo, sector, sealtasks.TTPreCommit2, func() (storiface.CallID, error) { return t.Worker.SealPreCommit2(ctx, sector, pc1o) }) } func (t *trackedWorker) SealCommit1(ctx context.Context, sector storage.SectorRef, ticket abi.SealRandomness, seed abi.InteractiveSealRandomness, pieces []abi.PieceInfo, cids storage.SectorCids) (storiface.CallID, error) { - return t.tracker.track(ctx, t.execute, t.wid, t.workerInfo, sector, sealtasks.TTCommit1)(t.Worker.SealCommit1(ctx, sector, ticket, seed, pieces, cids)) + return t.tracker.track(ctx, t.execute, t.wid, t.workerInfo, sector, sealtasks.TTCommit1, func() (storiface.CallID, error) { return t.Worker.SealCommit1(ctx, sector, ticket, seed, pieces, cids) }) } func (t *trackedWorker) SealCommit2(ctx context.Context, sector storage.SectorRef, c1o storage.Commit1Out) (storiface.CallID, error) { - return t.tracker.track(ctx, t.execute, t.wid, t.workerInfo, sector, sealtasks.TTCommit2)(t.Worker.SealCommit2(ctx, sector, c1o)) + return t.tracker.track(ctx, t.execute, t.wid, t.workerInfo, sector, sealtasks.TTCommit2, func() (storiface.CallID, error) { return t.Worker.SealCommit2(ctx, sector, c1o) }) } func (t *trackedWorker) FinalizeSector(ctx context.Context, sector storage.SectorRef, keepUnsealed []storage.Range) (storiface.CallID, error) { - return t.tracker.track(ctx, t.execute, t.wid, t.workerInfo, sector, sealtasks.TTFinalize)(t.Worker.FinalizeSector(ctx, sector, keepUnsealed)) + return t.tracker.track(ctx, t.execute, t.wid, t.workerInfo, sector, sealtasks.TTFinalize, func() (storiface.CallID, error) { return t.Worker.FinalizeSector(ctx, sector, keepUnsealed) }) } func (t *trackedWorker) AddPiece(ctx context.Context, sector storage.SectorRef, pieceSizes []abi.UnpaddedPieceSize, newPieceSize abi.UnpaddedPieceSize, pieceData storage.Data) (storiface.CallID, error) { - return t.tracker.track(ctx, t.execute, t.wid, t.workerInfo, sector, sealtasks.TTAddPiece)(t.Worker.AddPiece(ctx, sector, pieceSizes, newPieceSize, pieceData)) + return t.tracker.track(ctx, t.execute, t.wid, t.workerInfo, sector, sealtasks.TTAddPiece, func() (storiface.CallID, error) { + return t.Worker.AddPiece(ctx, sector, pieceSizes, newPieceSize, pieceData) + }) } func (t *trackedWorker) Fetch(ctx context.Context, s storage.SectorRef, ft storiface.SectorFileType, ptype storiface.PathType, am storiface.AcquireMode) (storiface.CallID, error) { - return t.tracker.track(ctx, t.execute, t.wid, t.workerInfo, s, sealtasks.TTFetch)(t.Worker.Fetch(ctx, s, ft, ptype, am)) + return t.tracker.track(ctx, t.execute, t.wid, t.workerInfo, s, sealtasks.TTFetch, func() (storiface.CallID, error) { return t.Worker.Fetch(ctx, s, ft, ptype, am) }) } func (t *trackedWorker) UnsealPiece(ctx context.Context, id storage.SectorRef, index storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, randomness abi.SealRandomness, cid cid.Cid) (storiface.CallID, error) { - return t.tracker.track(ctx, t.execute, t.wid, t.workerInfo, id, sealtasks.TTUnseal)(t.Worker.UnsealPiece(ctx, id, index, size, randomness, cid)) + return t.tracker.track(ctx, t.execute, t.wid, t.workerInfo, id, sealtasks.TTUnseal, func() (storiface.CallID, error) { return t.Worker.UnsealPiece(ctx, id, index, size, randomness, cid) }) } var _ Worker = &trackedWorker{}