Skip to content

Commit

Permalink
Fix bug where MutatesData would not correctly propogate through conne…
Browse files Browse the repository at this point in the history
…ctors
  • Loading branch information
djaglowski committed Dec 7, 2023
1 parent dc28ec1 commit 6a03bda
Show file tree
Hide file tree
Showing 11 changed files with 212 additions and 60 deletions.
25 changes: 25 additions & 0 deletions .chloggen/connectors-propogate-mutates-data.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: bug_fix

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: service

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Fix bug where MutatesData would not correctly propagate through connectors.

# One or more tracking issues or pull requests related to the change
issues: [9053]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
2 changes: 1 addition & 1 deletion internal/fanoutconsumer/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ type logsConsumer struct {
}

func (lsc *logsConsumer) Capabilities() consumer.Capabilities {
return consumer.Capabilities{MutatesData: false}
return consumer.Capabilities{MutatesData: len(lsc.mutable) > 0}
}

// ConsumeLogs exports the plog.Logs to all consumers wrapped by the current one.
Expand Down
14 changes: 10 additions & 4 deletions internal/fanoutconsumer/logs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,12 @@ func TestLogsNotMultiplexing(t *testing.T) {
assert.Same(t, nop, lfc)
}

func TestLogsNotMultiplexingMutating(t *testing.T) {
p := &mutatingLogsSink{LogsSink: new(consumertest.LogsSink)}
lfc := NewLogs([]consumer.Logs{p})
assert.True(t, lfc.Capabilities().MutatesData)
}

func TestLogsMultiplexingNonMutating(t *testing.T) {
p1 := new(consumertest.LogsSink)
p2 := new(consumertest.LogsSink)
Expand Down Expand Up @@ -68,7 +74,7 @@ func TestLogsMultiplexingMutating(t *testing.T) {
p3 := &mutatingLogsSink{LogsSink: new(consumertest.LogsSink)}

lfc := NewLogs([]consumer.Logs{p1, p2, p3})
assert.False(t, lfc.Capabilities().MutatesData)
assert.True(t, lfc.Capabilities().MutatesData)
ld := testdata.GenerateLogs(1)

for i := 0; i < 2; i++ {
Expand Down Expand Up @@ -105,7 +111,7 @@ func TestReadOnlyLogsMultiplexingMutating(t *testing.T) {
p3 := &mutatingLogsSink{LogsSink: new(consumertest.LogsSink)}

lfc := NewLogs([]consumer.Logs{p1, p2, p3})
assert.False(t, lfc.Capabilities().MutatesData)
assert.True(t, lfc.Capabilities().MutatesData)
ldOrig := testdata.GenerateLogs(1)
ld := testdata.GenerateLogs(1)
ld.MarkReadOnly()
Expand Down Expand Up @@ -142,7 +148,7 @@ func TestLogsMultiplexingMixLastMutating(t *testing.T) {
p3 := &mutatingLogsSink{LogsSink: new(consumertest.LogsSink)}

lfc := NewLogs([]consumer.Logs{p1, p2, p3})
assert.False(t, lfc.Capabilities().MutatesData)
assert.True(t, lfc.Capabilities().MutatesData)
ld := testdata.GenerateLogs(1)

for i := 0; i < 2; i++ {
Expand Down Expand Up @@ -180,7 +186,7 @@ func TestLogsMultiplexingMixLastNonMutating(t *testing.T) {
p3 := new(consumertest.LogsSink)

lfc := NewLogs([]consumer.Logs{p1, p2, p3})
assert.False(t, lfc.Capabilities().MutatesData)
assert.True(t, lfc.Capabilities().MutatesData)
ld := testdata.GenerateLogs(1)

for i := 0; i < 2; i++ {
Expand Down
2 changes: 1 addition & 1 deletion internal/fanoutconsumer/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ type metricsConsumer struct {
}

func (msc *metricsConsumer) Capabilities() consumer.Capabilities {
return consumer.Capabilities{MutatesData: false}
return consumer.Capabilities{MutatesData: len(msc.mutable) > 0}
}

// ConsumeMetrics exports the pmetric.Metrics to all consumers wrapped by the current one.
Expand Down
14 changes: 10 additions & 4 deletions internal/fanoutconsumer/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,12 @@ func TestMetricsNotMultiplexing(t *testing.T) {
assert.Same(t, nop, mfc)
}

func TestMetricssNotMultiplexingMutating(t *testing.T) {
p := &mutatingMetricsSink{MetricsSink: new(consumertest.MetricsSink)}
lfc := NewMetrics([]consumer.Metrics{p})
assert.True(t, lfc.Capabilities().MutatesData)
}

func TestMetricsMultiplexingNonMutating(t *testing.T) {
p1 := new(consumertest.MetricsSink)
p2 := new(consumertest.MetricsSink)
Expand Down Expand Up @@ -68,7 +74,7 @@ func TestMetricsMultiplexingMutating(t *testing.T) {
p3 := &mutatingMetricsSink{MetricsSink: new(consumertest.MetricsSink)}

mfc := NewMetrics([]consumer.Metrics{p1, p2, p3})
assert.False(t, mfc.Capabilities().MutatesData)
assert.True(t, mfc.Capabilities().MutatesData)
md := testdata.GenerateMetrics(1)

for i := 0; i < 2; i++ {
Expand Down Expand Up @@ -105,7 +111,7 @@ func TestReadOnlyMetricsMultiplexingMixFirstMutating(t *testing.T) {
p3 := &mutatingMetricsSink{MetricsSink: new(consumertest.MetricsSink)}

mfc := NewMetrics([]consumer.Metrics{p1, p2, p3})
assert.False(t, mfc.Capabilities().MutatesData)
assert.True(t, mfc.Capabilities().MutatesData)
mdOrig := testdata.GenerateMetrics(1)
md := testdata.GenerateMetrics(1)
md.MarkReadOnly()
Expand Down Expand Up @@ -142,7 +148,7 @@ func TestMetricsMultiplexingMixLastMutating(t *testing.T) {
p3 := &mutatingMetricsSink{MetricsSink: new(consumertest.MetricsSink)}

mfc := NewMetrics([]consumer.Metrics{p1, p2, p3})
assert.False(t, mfc.Capabilities().MutatesData)
assert.True(t, mfc.Capabilities().MutatesData)
md := testdata.GenerateMetrics(1)

for i := 0; i < 2; i++ {
Expand Down Expand Up @@ -180,7 +186,7 @@ func TestMetricsMultiplexingMixLastNonMutating(t *testing.T) {
p3 := new(consumertest.MetricsSink)

mfc := NewMetrics([]consumer.Metrics{p1, p2, p3})
assert.False(t, mfc.Capabilities().MutatesData)
assert.True(t, mfc.Capabilities().MutatesData)
md := testdata.GenerateMetrics(1)

for i := 0; i < 2; i++ {
Expand Down
2 changes: 1 addition & 1 deletion internal/fanoutconsumer/traces.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ type tracesConsumer struct {
}

func (tsc *tracesConsumer) Capabilities() consumer.Capabilities {
return consumer.Capabilities{MutatesData: false}
return consumer.Capabilities{MutatesData: len(tsc.mutable) > 0}
}

// ConsumeTraces exports the ptrace.Traces to all consumers wrapped by the current one.
Expand Down
14 changes: 10 additions & 4 deletions internal/fanoutconsumer/traces_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,12 @@ func TestTracesNotMultiplexing(t *testing.T) {
assert.Same(t, nop, tfc)
}

func TestTracesNotMultiplexingMutating(t *testing.T) {
p := &mutatingTracesSink{TracesSink: new(consumertest.TracesSink)}
lfc := NewTraces([]consumer.Traces{p})
assert.True(t, lfc.Capabilities().MutatesData)
}

func TestTracesMultiplexingNonMutating(t *testing.T) {
p1 := new(consumertest.TracesSink)
p2 := new(consumertest.TracesSink)
Expand Down Expand Up @@ -68,7 +74,7 @@ func TestTracesMultiplexingMutating(t *testing.T) {
p3 := &mutatingTracesSink{TracesSink: new(consumertest.TracesSink)}

tfc := NewTraces([]consumer.Traces{p1, p2, p3})
assert.False(t, tfc.Capabilities().MutatesData)
assert.True(t, tfc.Capabilities().MutatesData)
td := testdata.GenerateTraces(1)

for i := 0; i < 2; i++ {
Expand Down Expand Up @@ -105,7 +111,7 @@ func TestReadOnlyTracesMultiplexingMutating(t *testing.T) {
p3 := &mutatingTracesSink{TracesSink: new(consumertest.TracesSink)}

tfc := NewTraces([]consumer.Traces{p1, p2, p3})
assert.False(t, tfc.Capabilities().MutatesData)
assert.True(t, tfc.Capabilities().MutatesData)

tdOrig := testdata.GenerateTraces(1)
td := testdata.GenerateTraces(1)
Expand Down Expand Up @@ -143,7 +149,7 @@ func TestTracesMultiplexingMixLastMutating(t *testing.T) {
p3 := &mutatingTracesSink{TracesSink: new(consumertest.TracesSink)}

tfc := NewTraces([]consumer.Traces{p1, p2, p3})
assert.False(t, tfc.Capabilities().MutatesData)
assert.True(t, tfc.Capabilities().MutatesData)
td := testdata.GenerateTraces(1)

for i := 0; i < 2; i++ {
Expand Down Expand Up @@ -181,7 +187,7 @@ func TestTracesMultiplexingMixLastNonMutating(t *testing.T) {
p3 := new(consumertest.TracesSink)

tfc := NewTraces([]consumer.Traces{p1, p2, p3})
assert.False(t, tfc.Capabilities().MutatesData)
assert.True(t, tfc.Capabilities().MutatesData)
td := testdata.GenerateTraces(1)

for i := 0; i < 2; i++ {
Expand Down
6 changes: 4 additions & 2 deletions service/internal/graph/graph.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,10 @@ func (g *Graph) buildComponents(ctx context.Context, set Settings) error {
case *connectorNode:
err = n.buildComponent(ctx, telemetrySettings, set.BuildInfo, set.ConnectorBuilder, g.nextConsumers(n.ID()))
case *capabilitiesNode:
capability := consumer.Capabilities{MutatesData: false}
capability := consumer.Capabilities{
// The fanOutNode represents the aggregate capabilities of the exporters in the pipeline.
MutatesData: g.pipelines[n.pipelineID].fanOutNode.getConsumer().Capabilities().MutatesData,
}
for _, proc := range g.pipelines[n.pipelineID].processors {
capability.MutatesData = capability.MutatesData || proc.getConsumer().Capabilities().MutatesData
}
Expand Down Expand Up @@ -319,7 +322,6 @@ func (g *Graph) buildComponents(ctx context.Context, set Settings) error {
case component.DataTypeMetrics:
consumers := make([]consumer.Metrics, 0, len(nexts))
for _, next := range nexts {

consumers = append(consumers, next.(consumer.Metrics))
}
n.baseConsumer = fanoutconsumer.NewMetrics(consumers)
Expand Down
Loading

0 comments on commit 6a03bda

Please sign in to comment.