diff --git a/cmd/otelcol/config/collector/agent_config.yaml b/cmd/otelcol/config/collector/agent_config.yaml index 5ef665671a..16663e0b9d 100644 --- a/cmd/otelcol/config/collector/agent_config.yaml +++ b/cmd/otelcol/config/collector/agent_config.yaml @@ -92,6 +92,7 @@ receivers: #access_token_passthrough: true zipkin: endpoint: "${SPLUNK_LISTEN_INTERFACE}:9411" + nop: processors: batch: @@ -137,6 +138,11 @@ exporters: #ingest_url: http://${SPLUNK_GATEWAY_URL}:9943 sync_host_metadata: true correlation: + # Entities (applicable only if discovery mode is enabled) + otlphttp/entities: + logs_endpoint: "${SPLUNK_INGEST_URL}/v3/event" + headers: + "X-SF-Token": "${SPLUNK_ACCESS_TOKEN}" # Logs splunk_hec: token: "${SPLUNK_HEC_TOKEN}" @@ -190,6 +196,11 @@ service: receivers: [signalfx, smartagent/processlist] processors: [memory_limiter, batch, resourcedetection] exporters: [signalfx] + logs/entities: + # Receivers are dynamically added if discovery mode is enabled + receivers: [nop] + processors: [memory_limiter, batch, resourcedetection] + exporters: [otlphttp/entities] logs: receivers: [fluentforward, otlp] processors: diff --git a/cmd/otelcol/config/collector/upstream_agent_config.yaml b/cmd/otelcol/config/collector/upstream_agent_config.yaml index 98ccf75068..d03ac3f811 100644 --- a/cmd/otelcol/config/collector/upstream_agent_config.yaml +++ b/cmd/otelcol/config/collector/upstream_agent_config.yaml @@ -86,6 +86,7 @@ receivers: endpoint: 0.0.0.0:9943 zipkin: endpoint: 0.0.0.0:9411 + nop: processors: batch: @@ -132,6 +133,11 @@ exporters: #ingest_url: http://${SPLUNK_GATEWAY_URL}:9943 sync_host_metadata: true correlation: + # Entities (applicable only if discovery mode is enabled) + otlphttp/entities: + logs_endpoint: "${SPLUNK_INGEST_URL}/v3/event" + headers: + "X-SF-Token": "${SPLUNK_ACCESS_TOKEN}" splunk_hec/profiling: token: "${SPLUNK_ACCESS_TOKEN}" endpoint: "https://ingest.${SPLUNK_REALM}.signalfx.com/v1/log" @@ -180,6 +186,11 @@ service: exporters: [signalfx] # Use instead when sending to gateway #exporters: [otlp] + logs/entities: + # Receivers are dynamically added if discovery mode is enabled + receivers: [nop] + processors: [memory_limiter, batch, resourcedetection] + exporters: [otlphttp/entities] # Required for profiling logs: receivers: [otlp] diff --git a/internal/configconverter/discovery.go b/internal/configconverter/discovery.go index 9febaa3e74..dcdb377894 100644 --- a/internal/configconverter/discovery.go +++ b/internal/configconverter/discovery.go @@ -17,6 +17,7 @@ package configconverter import ( "context" "fmt" + "strings" "go.opentelemetry.io/collector/confmap" @@ -58,7 +59,13 @@ func SetupDiscovery(_ context.Context, in *confmap.Conf) error { service["extensions"] = appendUnique(serviceExtensions, discoExtensions) } - metricsPipeline, metricsReceivers, err := getMetricsPipelineAndReceivers(service) + pipelines := map[string]any{} + if pl, ok := service["pipelines"]; ok && pl != nil { + pipelines = pl.(map[string]any) + } + service["pipelines"] = pipelines + + metricsPipeline, metricsReceivers, err := getMetricsPipelineAndReceivers(pipelines) if err != nil { return err } @@ -67,6 +74,8 @@ func SetupDiscovery(_ context.Context, in *confmap.Conf) error { metricsPipeline["receivers"] = appendUnique(metricsReceivers, discoReceivers) } + setEntitiesPipelineReceivers(pipelines, discoReceivers) + setAutoDiscoveryResourceAttribute(service) *in = *confmap.NewFromStringMap(out) @@ -135,13 +144,7 @@ func getDiscoReceivers(service map[string]any) (bool, []any, error) { return isSet, receivers, nil } -func getMetricsPipelineAndReceivers(service map[string]any) (map[string]any, []any, error) { - pipelines := map[string]any{} - if pl, ok := service["pipelines"]; ok && pl != nil { - pipelines = pl.(map[string]any) - } - service["pipelines"] = pipelines - +func getMetricsPipelineAndReceivers(pipelines map[string]any) (map[string]any, []any, error) { metricsPipeline := map[string]any{} if mp, ok := pipelines["metrics"]; ok && mp != nil { metricsPipeline = mp.(map[string]any) @@ -158,6 +161,25 @@ func getMetricsPipelineAndReceivers(service map[string]any) (map[string]any, []a return metricsPipeline, metricsReceivers, nil } +func setEntitiesPipelineReceivers(pipelines map[string]any, discoReceivers []any) { + ep, ok := pipelines["logs/entities"].(map[string]any) + if !ok { + // Entities pipeline not set, nothing to do. + return + } + + var receivers []any + if existing, ok := ep["receivers"]; ok && existing != nil { + receivers = existing.([]any) + } + for _, r := range discoReceivers { + if strings.HasPrefix(r.(string), "discovery") { + receivers = appendUnique(receivers, []any{r}) + } + } + ep["receivers"] = receivers +} + func appendUnique(serviceComponents []any, discoComponents []any) []any { existing := map[any]struct{}{} for _, e := range serviceComponents { diff --git a/internal/configconverter/discovery_test.go b/internal/configconverter/discovery_test.go index b6c2089336..5239bdda12 100644 --- a/internal/configconverter/discovery_test.go +++ b/internal/configconverter/discovery_test.go @@ -226,6 +226,87 @@ func TestDiscoveryEmptyReceivers(t *testing.T) { require.Equal(t, expected.ToStringMap(), in.ToStringMap()) } +func TestContinuousDiscoveryNoEntitiesPipeline(t *testing.T) { + in := confFromYaml(t, `service: + pipelines: + metrics: + processors: [proc/one, proc/two, proc/three] + exporters: [exp/one, exp/two, exp/three] + metrics/untouched: + receivers: [recv/six, recv/seven, recv/eight] + processors: [proc/six, proc/seven, proc/eight] + exporters: [exp/six, exp/seven, exp/eight] + logs/untouched: + receivers: [recv/six] + processors: [proc/six] + exporters: [exp/six] + receivers/splunk.discovery: [discovery/host_observer] +`) + + expected := confFromYaml(t, `service: + pipelines: + metrics: + receivers: [discovery/host_observer] + processors: [proc/one, proc/two, proc/three] + exporters: [exp/one, exp/two, exp/three] + metrics/untouched: + receivers: [recv/six, recv/seven, recv/eight] + processors: [proc/six, proc/seven, proc/eight] + exporters: [exp/six, exp/seven, exp/eight] + logs/untouched: + receivers: [recv/six] + processors: [proc/six] + exporters: [exp/six] + telemetry: + resource: + splunk_autodiscovery: "true" +`) + + require.NoError(t, SetupDiscovery(context.Background(), in)) + require.Equal(t, expected.ToStringMap(), in.ToStringMap()) +} + +func TestContinuousDiscoveryWithEntitiesPipeline(t *testing.T) { + in := confFromYaml(t, `service: + pipelines: + metrics: + receivers: [recv/one] + processors: [proc/one, proc/two, proc/three] + exporters: [exp/one, exp/two, exp/three] + logs/entities: + receivers: [recv/one, recv/two] + processors: [proc/one, proc/two] + exporters: [exp/one, exp/two] + logs/untouched: + receivers: [recv/three] + processors: [proc/three] + exporters: [exp/three] + receivers/splunk.discovery: [discovery/one, discovery/two] +`) + + expected := confFromYaml(t, `service: + pipelines: + metrics: + receivers: [recv/one, discovery/one, discovery/two] + processors: [proc/one, proc/two, proc/three] + exporters: [exp/one, exp/two, exp/three] + logs/entities: + receivers: [recv/one, recv/two, discovery/one, discovery/two] + processors: [proc/one, proc/two] + exporters: [exp/one, exp/two] + logs/untouched: + receivers: [recv/three] + processors: [proc/three] + exporters: [exp/three] + telemetry: + resource: + splunk_autodiscovery: "true" +`) + + require.NoError(t, SetupDiscovery(context.Background(), in)) + require.Equal(t, expected.ToStringMap(), in.ToStringMap()) +} + func confFromYaml(t testing.TB, content string) *confmap.Conf { var conf map[string]any if err := yaml.Unmarshal([]byte(content), &conf); err != nil { diff --git a/internal/confmapprovider/discovery/discoverer.go b/internal/confmapprovider/discovery/discoverer.go index 8e3b7c7dcf..f673e06503 100644 --- a/internal/confmapprovider/discovery/discoverer.go +++ b/internal/confmapprovider/discovery/discoverer.go @@ -352,24 +352,9 @@ func (d *discoverer) continuousDiscoveryConfig(cfg *Config, discoveryReceiversCo dCfg := map[string]any{ "extensions": extensions, "receivers": discoveryReceiversConfigs, - "exporters": map[string]any{ - "otlphttp/entities": map[string]any{ - "logs_endpoint": "https://ingest.${SPLUNK_REALM}.signalfx.com/v3/event", - "headers": map[string]any{ - "X-SF-Token": "${SPLUNK_ACCESS_TOKEN}", - }, - }, - }, "service": map[string]any{ discovery.DiscoReceiversKey: receiverIDs, discovery.DiscoExtensionsKey: observerIDs, - "pipelines": map[string]any{ - "logs/entities": map[string]any{ - "receivers": receiverIDs, - // TODO: add processors dynamically in the converter: memory_limiter, resource (in k8s only). - "exporters": []string{"otlphttp/entities"}, - }, - }, }, } d.logger.Debug("determined continuous discovery config", zap.Any("config", dCfg)) diff --git a/internal/confmapprovider/discovery/provider_test.go b/internal/confmapprovider/discovery/provider_test.go index 75f1e6dfde..5c27bad6c3 100644 --- a/internal/confmapprovider/discovery/provider_test.go +++ b/internal/confmapprovider/discovery/provider_test.go @@ -27,6 +27,7 @@ import ( "go.opentelemetry.io/collector/config/configopaque" "go.opentelemetry.io/collector/confmap" "go.opentelemetry.io/collector/confmap/provider/envprovider" + "go.opentelemetry.io/collector/confmap/provider/fileprovider" "go.opentelemetry.io/collector/exporter/otlphttpexporter" "go.opentelemetry.io/collector/featuregate" "go.opentelemetry.io/collector/otelcol" @@ -110,18 +111,20 @@ func TestConfigDProviderInvalidURIs(t *testing.T) { func TestDiscoveryProvider_ContinuousDiscoveryConfig(t *testing.T) { require.NoError(t, featuregate.GlobalRegistry().Set(continuousDiscoveryFGKey, true)) - t.Setenv("SPLUNK_REALM", "fake-realm") + t.Setenv("SPLUNK_INGEST_URL", "https://ingest.fake-realm.signalfx.com") t.Setenv("SPLUNK_ACCESS_TOKEN", "fake-token") confmapProvider, err := New() require.NoError(t, err) - cfgDir, err := filepath.Abs(filepath.Join(".", "testdata", "config.d")) - require.NoError(t, err) provider, err := otelcol.NewConfigProvider(otelcol.ConfigProviderSettings{ ResolverSettings: confmap.ResolverSettings{ - URIs: []string{fmt.Sprintf("%s:%s", discoveryModeScheme, cfgDir)}, + URIs: []string{ + fmt.Sprintf("file:%s", filepath.Join("testdata", "base-config.yaml")), + fmt.Sprintf("%s:%s", discoveryModeScheme, filepath.Join("testdata", "config.d")), + }, ProviderFactories: []confmap.ProviderFactory{ + fileprovider.NewFactory(), confmapProvider.DiscoveryModeProviderFactory(), envprovider.NewFactory(), }, diff --git a/internal/confmapprovider/discovery/testdata/base-config.yaml b/internal/confmapprovider/discovery/testdata/base-config.yaml new file mode 100644 index 0000000000..266bd731c8 --- /dev/null +++ b/internal/confmapprovider/discovery/testdata/base-config.yaml @@ -0,0 +1,9 @@ +exporters: + otlphttp/entities: + logs_endpoint: "${SPLUNK_INGEST_URL}/v3/event" + headers: + "X-SF-Token": "${SPLUNK_ACCESS_TOKEN}" +service: + pipelines: + logs/entities: + exporters: [otlphttp/entities] diff --git a/tests/general/default_config_test.go b/tests/general/default_config_test.go index b5e0b6901a..a8a3d7a7cf 100644 --- a/tests/general/default_config_test.go +++ b/tests/general/default_config_test.go @@ -274,6 +274,12 @@ func TestDefaultAgentConfig(t *testing.T) { "token": "", "log_data_enabled": false, }, + "otlphttp/entities": map[string]any{ + "logs_endpoint": "https://ingest.not.real.signalfx.com/v3/event", + "headers": map[string]any{ + "X-SF-Token": "", + }, + }, }, "extensions": map[string]any{ "health_check": map[string]any{"endpoint": fmt.Sprintf("%s:13133", ip)}, @@ -362,7 +368,8 @@ func TestDefaultAgentConfig(t *testing.T) { }, "signalfx": map[string]any{"endpoint": fmt.Sprintf("%s:9943", ip)}, "smartagent/processlist": map[string]any{"type": "processlist"}, - "zipkin": map[string]any{"endpoint": fmt.Sprintf("%s:9411", ip)}}, + "zipkin": map[string]any{"endpoint": fmt.Sprintf("%s:9411", ip)}, + "nop": nil}, "service": map[string]any{ "telemetry": map[string]any{"metrics": map[string]any{"address": fmt.Sprintf("%s:8888", ip)}}, "extensions": []any{"health_check", "http_forwarder", "zpages", "smartagent"}, @@ -388,6 +395,11 @@ func TestDefaultAgentConfig(t *testing.T) { "processors": []any{"memory_limiter", "batch", "resourcedetection"}, "receivers": []any{"jaeger", "otlp", "zipkin"}, }, + "logs/entities": map[string]any{ + "receivers": []any{"nop"}, + "processors": []any{"memory_limiter", "batch", "resourcedetection"}, + "exporters": []any{"otlphttp/entities"}, + }, }, }, }, config)