diff --git a/chain/vm/execution.go b/chain/vm/execution.go index ea3a9719341..4fb626f4390 100644 --- a/chain/vm/execution.go +++ b/chain/vm/execution.go @@ -41,14 +41,14 @@ func newVMExecutor(vmi Interface, lane ExecutionLane) Interface { } func (e *vmExecutor) ApplyMessage(ctx context.Context, cmsg types.ChainMsg) (*ApplyRet, error) { - token := execution.getToken(e.lane) + token := execution.getToken(ctx, e.lane) defer token.Done() return e.vmi.ApplyMessage(ctx, cmsg) } func (e *vmExecutor) ApplyImplicitMessage(ctx context.Context, msg *types.Message) (*ApplyRet, error) { - token := execution.getToken(e.lane) + token := execution.getToken(ctx, e.lane) defer token.Done() return e.vmi.ApplyImplicitMessage(ctx, msg) @@ -61,6 +61,7 @@ func (e *vmExecutor) Flush(ctx context.Context) (cid.Cid, error) { type executionToken struct { lane ExecutionLane reserved int + ctx context.Context } func (token *executionToken) Done() { @@ -77,78 +78,69 @@ type executionEnv struct { reserved int } -func (e *executionEnv) getToken(lane ExecutionLane) *executionToken { - metricsUp(metrics.VMExecutionWaiting, lane) - defer metricsDown(metrics.VMExecutionWaiting, lane) +func (e *executionEnv) getToken(ctx context.Context, lane ExecutionLane) *executionToken { + metricsUp(ctx, metrics.VMExecutionWaiting, lane) + defer metricsDown(ctx, metrics.VMExecutionWaiting, lane) e.mx.Lock() - defer e.mx.Unlock() - switch lane { - case ExecutionLaneDefault: + reserving := 0 + if lane == ExecutionLaneDefault { for e.available <= e.reserved { e.cond.Wait() } - e.available-- - - metricsUp(metrics.VMExecutionRunning, lane) - return &executionToken{lane: lane, reserved: 0} - - case ExecutionLanePriority: + } else { for e.available == 0 { e.cond.Wait() } - - e.available-- - - reserving := 0 if e.reserved > 0 { e.reserved-- reserving = 1 } + } - metricsUp(metrics.VMExecutionRunning, lane) - return &executionToken{lane: lane, reserved: reserving} + e.available-- + e.mx.Unlock() - default: - // already checked at interface boundary in NewVM, so this is appropriate - panic("bogus execution lane") - } + metricsUp(ctx, metrics.VMExecutionRunning, lane) + return &executionToken{lane: lane, reserved: reserving, ctx: ctx} } func (e *executionEnv) putToken(token *executionToken) { e.mx.Lock() - defer e.mx.Unlock() e.available++ e.reserved += token.reserved // Note: Signal is unsound, because a priority token could wake up a non-priority - // goroutnie and lead to deadlock. So Broadcast it must be. + // goroutine and lead to deadlock. So Broadcast it must be. e.cond.Broadcast() + e.mx.Unlock() - metricsDown(metrics.VMExecutionRunning, token.lane) + metricsDown(token.ctx, metrics.VMExecutionRunning, token.lane) } -func metricsUp(metric *stats.Int64Measure, lane ExecutionLane) { - metricsAdjust(metric, lane, 1) +func metricsUp(ctx context.Context, metric *stats.Int64Measure, lane ExecutionLane) { + metricsAdjust(ctx, metric, lane, 1) } -func metricsDown(metric *stats.Int64Measure, lane ExecutionLane) { - metricsAdjust(metric, lane, -1) +func metricsDown(ctx context.Context, metric *stats.Int64Measure, lane ExecutionLane) { + metricsAdjust(ctx, metric, lane, -1) } -func metricsAdjust(metric *stats.Int64Measure, lane ExecutionLane, delta int) { - laneName := "default" +var ( + defaultLaneTag = tag.Upsert(metrics.ExecutionLane, "default") + priorityLaneTag = tag.Upsert(metrics.ExecutionLane, "priority") +) + +func metricsAdjust(ctx context.Context, metric *stats.Int64Measure, lane ExecutionLane, delta int) { + laneTag := defaultLaneTag if lane > ExecutionLaneDefault { - laneName = "priority" + laneTag = priorityLaneTag } - ctx, _ := tag.New( - context.Background(), - tag.Upsert(metrics.ExecutionLane, laneName), - ) + ctx, _ = tag.New(ctx, laneTag) stats.Record(ctx, metric.M(int64(delta))) }