From 7ec38e5c1992da98d832a048e2eb0da8065bbed5 Mon Sep 17 00:00:00 2001 From: Daniel Jaglowski Date: Mon, 11 Dec 2023 10:04:37 -0800 Subject: [PATCH] Fanout consumer does not need to mutate in some cases (#9062) Follow up to https://github.com/open-telemetry/opentelemetry-collector/pull/9053. @dmitryax pointed out [here](https://github.com/open-telemetry/opentelemetry-collector/pull/9053#discussion_r1420871665) that the fanout consumer will pass original data to a non-mutating consumer if any is available. This PR incorporates that point and updates test expectations accordingly. --- internal/fanoutconsumer/logs.go | 3 ++- internal/fanoutconsumer/logs_test.go | 4 ++-- internal/fanoutconsumer/metrics.go | 3 ++- internal/fanoutconsumer/metrics_test.go | 4 ++-- internal/fanoutconsumer/traces.go | 3 ++- internal/fanoutconsumer/traces_test.go | 4 ++-- service/internal/graph/graph_test.go | 10 +++++----- 7 files changed, 17 insertions(+), 14 deletions(-) diff --git a/internal/fanoutconsumer/logs.go b/internal/fanoutconsumer/logs.go index fb2cf90aa0b..5047fbbf70a 100644 --- a/internal/fanoutconsumer/logs.go +++ b/internal/fanoutconsumer/logs.go @@ -44,7 +44,8 @@ type logsConsumer struct { } func (lsc *logsConsumer) Capabilities() consumer.Capabilities { - return consumer.Capabilities{MutatesData: len(lsc.mutable) > 0} + // If all consumers are mutating, then the original data will be passed to one of them. + return consumer.Capabilities{MutatesData: len(lsc.mutable) > 0 && len(lsc.readonly) == 0} } // ConsumeLogs exports the plog.Logs to all consumers wrapped by the current one. diff --git a/internal/fanoutconsumer/logs_test.go b/internal/fanoutconsumer/logs_test.go index aedbea28e91..472239b3456 100644 --- a/internal/fanoutconsumer/logs_test.go +++ b/internal/fanoutconsumer/logs_test.go @@ -148,7 +148,7 @@ func TestLogsMultiplexingMixLastMutating(t *testing.T) { p3 := &mutatingLogsSink{LogsSink: new(consumertest.LogsSink)} lfc := NewLogs([]consumer.Logs{p1, p2, p3}) - assert.True(t, lfc.Capabilities().MutatesData) + assert.False(t, lfc.Capabilities().MutatesData) ld := testdata.GenerateLogs(1) for i := 0; i < 2; i++ { @@ -186,7 +186,7 @@ func TestLogsMultiplexingMixLastNonMutating(t *testing.T) { p3 := new(consumertest.LogsSink) lfc := NewLogs([]consumer.Logs{p1, p2, p3}) - assert.True(t, lfc.Capabilities().MutatesData) + assert.False(t, lfc.Capabilities().MutatesData) ld := testdata.GenerateLogs(1) for i := 0; i < 2; i++ { diff --git a/internal/fanoutconsumer/metrics.go b/internal/fanoutconsumer/metrics.go index 039830abf47..023db2a70e3 100644 --- a/internal/fanoutconsumer/metrics.go +++ b/internal/fanoutconsumer/metrics.go @@ -42,7 +42,8 @@ type metricsConsumer struct { } func (msc *metricsConsumer) Capabilities() consumer.Capabilities { - return consumer.Capabilities{MutatesData: len(msc.mutable) > 0} + // If all consumers are mutating, then the original data will be passed to one of them. + return consumer.Capabilities{MutatesData: len(msc.mutable) > 0 && len(msc.readonly) == 0} } // ConsumeMetrics exports the pmetric.Metrics to all consumers wrapped by the current one. diff --git a/internal/fanoutconsumer/metrics_test.go b/internal/fanoutconsumer/metrics_test.go index 37fde6ad52a..cf6455d0d07 100644 --- a/internal/fanoutconsumer/metrics_test.go +++ b/internal/fanoutconsumer/metrics_test.go @@ -148,7 +148,7 @@ func TestMetricsMultiplexingMixLastMutating(t *testing.T) { p3 := &mutatingMetricsSink{MetricsSink: new(consumertest.MetricsSink)} mfc := NewMetrics([]consumer.Metrics{p1, p2, p3}) - assert.True(t, mfc.Capabilities().MutatesData) + assert.False(t, mfc.Capabilities().MutatesData) md := testdata.GenerateMetrics(1) for i := 0; i < 2; i++ { @@ -186,7 +186,7 @@ func TestMetricsMultiplexingMixLastNonMutating(t *testing.T) { p3 := new(consumertest.MetricsSink) mfc := NewMetrics([]consumer.Metrics{p1, p2, p3}) - assert.True(t, mfc.Capabilities().MutatesData) + assert.False(t, mfc.Capabilities().MutatesData) md := testdata.GenerateMetrics(1) for i := 0; i < 2; i++ { diff --git a/internal/fanoutconsumer/traces.go b/internal/fanoutconsumer/traces.go index 69dac49ddca..e13068a656f 100644 --- a/internal/fanoutconsumer/traces.go +++ b/internal/fanoutconsumer/traces.go @@ -42,7 +42,8 @@ type tracesConsumer struct { } func (tsc *tracesConsumer) Capabilities() consumer.Capabilities { - return consumer.Capabilities{MutatesData: len(tsc.mutable) > 0} + // If all consumers are mutating, then the original data will be passed to one of them. + return consumer.Capabilities{MutatesData: len(tsc.mutable) > 0 && len(tsc.readonly) == 0} } // ConsumeTraces exports the ptrace.Traces to all consumers wrapped by the current one. diff --git a/internal/fanoutconsumer/traces_test.go b/internal/fanoutconsumer/traces_test.go index f04084882dc..966fdcf627b 100644 --- a/internal/fanoutconsumer/traces_test.go +++ b/internal/fanoutconsumer/traces_test.go @@ -149,7 +149,7 @@ func TestTracesMultiplexingMixLastMutating(t *testing.T) { p3 := &mutatingTracesSink{TracesSink: new(consumertest.TracesSink)} tfc := NewTraces([]consumer.Traces{p1, p2, p3}) - assert.True(t, tfc.Capabilities().MutatesData) + assert.False(t, tfc.Capabilities().MutatesData) td := testdata.GenerateTraces(1) for i := 0; i < 2; i++ { @@ -187,7 +187,7 @@ func TestTracesMultiplexingMixLastNonMutating(t *testing.T) { p3 := new(consumertest.TracesSink) tfc := NewTraces([]consumer.Traces{p1, p2, p3}) - assert.True(t, tfc.Capabilities().MutatesData) + assert.False(t, tfc.Capabilities().MutatesData) td := testdata.GenerateTraces(1) for i := 0; i < 2; i++ { diff --git a/service/internal/graph/graph_test.go b/service/internal/graph/graph_test.go index cea340a2f51..b4aee55cc4c 100644 --- a/service/internal/graph/graph_test.go +++ b/service/internal/graph/graph_test.go @@ -573,17 +573,17 @@ func TestConnectorPipelinesGraph(t *testing.T) { pipelineConfigs: pipelines.Config{ component.NewIDWithName("traces", "in"): { Receivers: []component.ID{component.NewID("examplereceiver")}, - Processors: []component.ID{component.NewID("exampleprocessor")}, + Processors: []component.ID{component.NewIDWithName("exampleprocessor", "mutate")}, Exporters: []component.ID{component.NewIDWithName("exampleconnector", "inherit_mutate")}, }, component.NewIDWithName("metrics", "in"): { Receivers: []component.ID{component.NewID("examplereceiver")}, - Processors: []component.ID{component.NewIDWithName("exampleprocessor", "mutate")}, // mutate propagates upstream to connector + Processors: []component.ID{component.NewIDWithName("exampleprocessor", "mutate")}, Exporters: []component.ID{component.NewIDWithName("exampleconnector", "inherit_mutate")}, }, component.NewIDWithName("logs", "in"): { Receivers: []component.ID{component.NewID("examplereceiver")}, - Processors: []component.ID{component.NewID("exampleprocessor")}, + Processors: []component.ID{component.NewIDWithName("exampleprocessor", "mutate")}, Exporters: []component.ID{component.NewIDWithName("exampleconnector", "inherit_mutate")}, }, component.NewIDWithName("traces", "out"): { @@ -593,12 +593,12 @@ func TestConnectorPipelinesGraph(t *testing.T) { }, component.NewIDWithName("metrics", "out"): { Receivers: []component.ID{component.NewIDWithName("exampleconnector", "inherit_mutate")}, - Processors: []component.ID{component.NewID("exampleprocessor")}, + Processors: []component.ID{component.NewIDWithName("exampleprocessor", "mutate")}, // mutate propagates upstream to connector Exporters: []component.ID{component.NewID("exampleexporter")}, }, component.NewIDWithName("logs", "out"): { Receivers: []component.ID{component.NewIDWithName("exampleconnector", "inherit_mutate")}, - Processors: []component.ID{component.NewIDWithName("exampleprocessor", "mutate")}, + Processors: []component.ID{component.NewIDWithName("exampleprocessor", "mutate")}, // mutate propagates upstream to connector Exporters: []component.ID{component.NewID("exampleexporter")}, }, },