Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix panic caused by race condition when accessing span attributes #12661

Merged
merged 6 commits into from
Aug 2, 2022
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion processor/spanmetricsprocessor/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ require (
go.opentelemetry.io/collector v0.56.0
go.opentelemetry.io/collector/pdata v0.56.0
go.opentelemetry.io/collector/semconv v0.56.0
go.uber.org/multierr v1.8.0
go.uber.org/zap v1.21.0
google.golang.org/grpc v1.48.0
)
Expand Down Expand Up @@ -80,7 +81,6 @@ require (
go.opentelemetry.io/otel/metric v0.31.0 // indirect
go.opentelemetry.io/otel/trace v1.8.0 // indirect
go.uber.org/atomic v1.9.0 // indirect
go.uber.org/multierr v1.8.0 // indirect
golang.org/x/net v0.0.0-20220624214902-1bab6f366d9e // indirect
golang.org/x/sys v0.0.0-20220627191245-f75cf1eec38b // indirect
golang.org/x/text v0.3.7 // indirect
Expand Down
42 changes: 24 additions & 18 deletions processor/spanmetricsprocessor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/pdata/ptrace"
conventions "go.opentelemetry.io/collector/semconv/v1.6.1"
"go.uber.org/multierr"
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/processor/spanmetricsprocessor/internal/cache"
Expand Down Expand Up @@ -228,28 +229,33 @@ func (p *processorImp) ConsumeTraces(ctx context.Context, traces ptrace.Traces)
// that should not interfere with the flow of trace data because
// it is an orthogonal concern to the trace flow (it should not impact
// upstream or downstream pipeline trace components).
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Parts of this comment are no longer accurate. Can you update it to reflect the new process?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, updated comment in: af3dc8f.

It's mostly deleting since I don't think that line of code is too controversial.

go func() {
// Since this is in a goroutine, the entire func can be locked without
// impacting trace processing performance. This also significantly
// reduces the number of locks/unlocks to manage, reducing the
// concurrency-bug surface area.
p.lock.Lock()
defer p.lock.Unlock()

p.aggregateMetrics(traces)
m, err := p.buildMetrics()
// Forward trace data unmodified.
return multierr.Combine(p.tracesToMetrics(ctx, traces), p.nextConsumer.ConsumeTraces(ctx, traces))
}

if err != nil {
p.logger.Error(err.Error())
} else if err = p.metricsExporter.ConsumeMetrics(ctx, *m); err != nil {
p.logger.Error(err.Error())
}
func (p *processorImp) tracesToMetrics(ctx context.Context, traces ptrace.Traces) error {
p.lock.Lock()

p.resetExemplarData()
}()
p.aggregateMetrics(traces)
m, err := p.buildMetrics()

// Forward trace data unmodified.
return p.nextConsumer.ConsumeTraces(ctx, traces)
// Exemplars are only relevant to this batch of traces, so must be cleared within the lock,
// regardless of error while building metrics, before the next batch is received.
p.resetExemplarData()

// This component no longer needs to read the metrics once built, so it is safe to unlock.
p.lock.Unlock()

if err != nil {
return err
}

if err = p.metricsExporter.ConsumeMetrics(ctx, *m); err != nil {
return err
}

return nil
}

// buildMetrics collects the computed raw metrics data, builds the metrics object and
Expand Down
37 changes: 21 additions & 16 deletions processor/spanmetricsprocessor/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ import (
conventions "go.opentelemetry.io/collector/semconv/v1.6.1"
"go.uber.org/zap"
"go.uber.org/zap/zaptest"
"go.uber.org/zap/zaptest/observer"
"google.golang.org/grpc/metadata"

"github.com/open-telemetry/opentelemetry-collector-contrib/processor/spanmetricsprocessor/internal/cache"
Expand Down Expand Up @@ -189,18 +188,22 @@ func TestProcessorConsumeTracesErrors(t *testing.T) {
consumeTracesErr error
}{
{
name: "metricsExporter error",
consumeMetricsErr: fmt.Errorf("metricsExporter error"),
name: "ConsumeMetrics error",
consumeMetricsErr: fmt.Errorf("consume metrics error"),
},
{
name: "nextConsumer error",
consumeTracesErr: fmt.Errorf("nextConsumer error"),
name: "ConsumeTraces error",
consumeTracesErr: fmt.Errorf("consume traces error"),
},
{
name: "ConsumeMetrics and ConsumeTraces error",
consumeMetricsErr: fmt.Errorf("consume metrics error"),
consumeTracesErr: fmt.Errorf("consume traces error"),
},
} {
t.Run(tc.name, func(t *testing.T) {
// Prepare
obs, logs := observer.New(zap.ErrorLevel)
logger := zap.New(obs)
logger := zap.NewNop()

mexp := &mocks.MetricsExporter{}
mexp.On("ConsumeMetrics", mock.Anything, mock.Anything).Return(tc.consumeMetricsErr)
Expand All @@ -215,17 +218,19 @@ func TestProcessorConsumeTracesErrors(t *testing.T) {
// Test
ctx := metadata.NewIncomingContext(context.Background(), nil)
err := p.ConsumeTraces(ctx, traces)
if tc.consumeTracesErr != nil {
require.Error(t, err)
assert.EqualError(t, err, tc.consumeTracesErr.Error())
return
}

// Verify
require.NoError(t, err)
assert.Eventually(t, func() bool {
return logs.FilterMessage(tc.consumeMetricsErr.Error()).Len() > 0
}, 10*time.Second, time.Millisecond*100)
require.Error(t, err)
switch {
case tc.consumeMetricsErr != nil && tc.consumeTracesErr != nil:
assert.EqualError(t, err, tc.consumeMetricsErr.Error()+"; "+tc.consumeTracesErr.Error())
case tc.consumeMetricsErr != nil:
assert.EqualError(t, err, tc.consumeMetricsErr.Error())
case tc.consumeTracesErr != nil:
assert.EqualError(t, err, tc.consumeTracesErr.Error())
default:
assert.Fail(t, "expected at least one error")
}
})
}
}
Expand Down
16 changes: 16 additions & 0 deletions unreleased/12644-fix-panic-race.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: bug_fix

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: spanmetricsprocessor

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Fix panic caused by race condition when accessing span attributes.

# One or more tracking issues related to the change
issues: [12644]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: