Skip to content

Commit

Permalink
storagemgr: Cleanup workerLk around worker resources
Browse files Browse the repository at this point in the history
  • Loading branch information
magik6k committed Sep 15, 2021
1 parent aab42dd commit 4288a47
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 12 deletions.
8 changes: 4 additions & 4 deletions extern/sector-storage/sched.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,12 +78,12 @@ type workerHandle struct {

info storiface.WorkerInfo

preparing *activeResources
active *activeResources
preparing *activeResources // use with workerHandle.lk
active *activeResources // use with workerHandle.lk

lk sync.Mutex
lk sync.Mutex // can be taken inside sched.workersLk.RLock

wndLk sync.Mutex
wndLk sync.Mutex // can be taken inside sched.workersLk.RLock
activeWindows []*schedWindow

enabled bool
Expand Down
12 changes: 4 additions & 8 deletions extern/sector-storage/sched_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -399,13 +399,11 @@ func (sw *schedWorker) startProcessingTask(taskDone chan struct{}, req *workerRe
go func() {
// first run the prepare step (e.g. fetching sector data from other worker)
err := req.prepare(req.ctx, sh.workTracker.worker(sw.wid, w.info, w.workerRpc))
sh.workersLk.Lock()
w.lk.Lock()

if err != nil {
w.lk.Lock()
w.preparing.free(w.info.Resources, needRes)
w.lk.Unlock()
sh.workersLk.Unlock()

select {
case taskDone <- struct{}{}:
Expand All @@ -424,12 +422,10 @@ func (sw *schedWorker) startProcessingTask(taskDone chan struct{}, req *workerRe
}

// wait (if needed) for resources in the 'active' window
err = w.active.withResources(sw.wid, w.info, needRes, &sh.workersLk, func() error {
w.lk.Lock()
err = w.active.withResources(sw.wid, w.info, needRes, &w.lk, func() error {
w.preparing.free(w.info.Resources, needRes)
w.lk.Unlock()
sh.workersLk.Unlock()
defer sh.workersLk.Lock() // we MUST return locked from this function
defer w.lk.Lock() // we MUST return locked from this function

select {
case taskDone <- struct{}{}:
Expand All @@ -450,7 +446,7 @@ func (sw *schedWorker) startProcessingTask(taskDone chan struct{}, req *workerRe
return nil
})

sh.workersLk.Unlock()
w.lk.Lock()

// This error should always be nil, since nothing is setting it, but just to be safe:
if err != nil {
Expand Down
2 changes: 2 additions & 0 deletions extern/sector-storage/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ func (m *Manager) WorkerStats() map[uuid.UUID]storiface.WorkerStats {
out := map[uuid.UUID]storiface.WorkerStats{}

for id, handle := range m.sched.workers {
handle.lk.Lock()
out[uuid.UUID(id)] = storiface.WorkerStats{
Info: handle.info,
Enabled: handle.enabled,
Expand All @@ -24,6 +25,7 @@ func (m *Manager) WorkerStats() map[uuid.UUID]storiface.WorkerStats {
GpuUsed: handle.active.gpuUsed,
CpuUse: handle.active.cpuUse,
}
handle.lk.Unlock()
}

return out
Expand Down

0 comments on commit 4288a47

Please sign in to comment.