From 94134cc0c428ac279c05f75f6fff45258cd40315 Mon Sep 17 00:00:00 2001 From: Radek Simko Date: Thu, 4 Feb 2021 10:07:47 +0000 Subject: [PATCH] module loader: Avoid deadlock --- internal/terraform/module/module_loader.go | 28 +++++++++---------- internal/terraform/module/module_ops_queue.go | 21 ++++---------- .../terraform/module/module_ops_queue_test.go | 4 +-- 3 files changed, 22 insertions(+), 31 deletions(-) diff --git a/internal/terraform/module/module_loader.go b/internal/terraform/module/module_loader.go index c291c8a3..cad305aa 100644 --- a/internal/terraform/module/module_loader.go +++ b/internal/terraform/module/module_loader.go @@ -85,17 +85,17 @@ func (ml *moduleLoader) run(ctx context.Context) { if nextOp.Module.HasOpenFiles() && ml.prioCapacity() > 0 { atomic.AddInt64(ml.prioLoadingCount, 1) - mod := ml.queue.PopOp() go func(ml *moduleLoader) { - defer atomic.AddInt64(ml.prioLoadingCount, -1) - ml.executeModuleOp(ctx, mod) + ml.executeModuleOp(ctx, nextOp) + atomic.AddInt64(ml.prioLoadingCount, -1) + ml.tryDispatchingModuleOp() }(ml) } else if ml.nonPrioCapacity() > 0 { atomic.AddInt64(ml.loadingCount, 1) - mod := ml.queue.PopOp() go func(ml *moduleLoader) { - defer atomic.AddInt64(ml.loadingCount, -1) - ml.executeModuleOp(ctx, mod) + ml.executeModuleOp(ctx, nextOp) + atomic.AddInt64(ml.loadingCount, -1) + ml.tryDispatchingModuleOp() }(ml) } else { // Account for an unlikely situation where next operation @@ -105,7 +105,8 @@ func (ml *moduleLoader) run(ctx context.Context) { // were decremented. ml.logger.Println("no available capacity, retrying dispatch") time.Sleep(100 * time.Millisecond) - ml.tryDispatchingModuleOp() + ml.queue.PushOp(nextOp) + go ml.tryDispatchingModuleOp() } } } @@ -113,13 +114,13 @@ func (ml *moduleLoader) run(ctx context.Context) { func (ml *moduleLoader) tryDispatchingModuleOp() { totalCapacity := ml.nonPrioCapacity() + ml.prioCapacity() - opsInQueue := ml.queue.Len() // Keep scheduling work from queue if we have capacity - if opsInQueue > 0 && totalCapacity > 0 { - item := ml.queue.Peek() - nextModOp := item.(ModuleOperation) - ml.opsToDispatch <- nextModOp + if totalCapacity > 0 { + nextModOp, ok := ml.queue.PopOp() + if ok { + ml.opsToDispatch <- nextModOp + } } } @@ -128,7 +129,7 @@ func (ml *moduleLoader) prioCapacity() int64 { } func (ml *moduleLoader) nonPrioCapacity() int64 { - return ml.prioParallelism - atomic.LoadInt64(ml.loadingCount) + return ml.nonPrioParallelism - atomic.LoadInt64(ml.loadingCount) } func (ml *moduleLoader) executeModuleOp(ctx context.Context, modOp ModuleOperation) { @@ -136,7 +137,6 @@ func (ml *moduleLoader) executeModuleOp(ctx context.Context, modOp ModuleOperati // TODO: Report progress in % for each op based on queue length defer ml.logger.Printf("finished %q for %s", modOp.Type, modOp.Module.Path()) defer modOp.markAsDone() - defer ml.tryDispatchingModuleOp() switch modOp.Type { case OpTypeGetTerraformVersion: diff --git a/internal/terraform/module/module_ops_queue.go b/internal/terraform/module/module_ops_queue.go index f4d0677a..798b9910 100644 --- a/internal/terraform/module/module_ops_queue.go +++ b/internal/terraform/module/module_ops_queue.go @@ -27,13 +27,17 @@ func (q *moduleOpsQueue) PushOp(op ModuleOperation) { } -func (q *moduleOpsQueue) PopOp() ModuleOperation { +func (q *moduleOpsQueue) PopOp() (ModuleOperation, bool) { q.mu.Lock() defer q.mu.Unlock() + if q.q.Len() == 0 { + return ModuleOperation{}, false + } + item := heap.Pop(&q.q) modOp := item.(ModuleOperation) - return modOp + return modOp, true } func (q *moduleOpsQueue) Len() int { @@ -43,14 +47,6 @@ func (q *moduleOpsQueue) Len() int { return q.q.Len() } -func (q *moduleOpsQueue) Peek() interface{} { - q.mu.Lock() - defer q.mu.Unlock() - - item := q.q.Peek() - return item -} - type queue []ModuleOperation var _ heap.Interface = &queue{} @@ -72,11 +68,6 @@ func (q *queue) Pop() interface{} { return item } -func (q queue) Peek() interface{} { - n := len(q) - return q[n-1] -} - func (q queue) Len() int { return len(q) } diff --git a/internal/terraform/module/module_ops_queue_test.go b/internal/terraform/module/module_ops_queue_test.go index d43c06b7..a766215e 100644 --- a/internal/terraform/module/module_ops_queue_test.go +++ b/internal/terraform/module/module_ops_queue_test.go @@ -45,7 +45,7 @@ func TestModuleOpsQueue_modulePriority(t *testing.T) { mq.PushOp(op) } - firstOp := mq.PopOp() + firstOp, _ := mq.PopOp() expectedFirstPath := filepath.Join(dir, "beta") firstPath := firstOp.Module.Path() @@ -54,7 +54,7 @@ func TestModuleOpsQueue_modulePriority(t *testing.T) { expectedFirstPath, firstPath) } - secondOp := mq.PopOp() + secondOp, _ := mq.PopOp() expectedSecondPath := filepath.Join(dir, "gamma") secondPath := secondOp.Module.Path() if secondPath != expectedSecondPath {