diff --git a/.chloggen/codeboten_may-29-otlp-export.yaml b/.chloggen/codeboten_may-29-otlp-export.yaml new file mode 100755 index 00000000000..c2cc4d1fef2 --- /dev/null +++ b/.chloggen/codeboten_may-29-otlp-export.yaml @@ -0,0 +1,20 @@ +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: breaking + +# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver) +component: service + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Enable configuration of collector telemetry through prometheus reader + +# One or more tracking issues or pull requests related to the change +issues: [7641] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: | + These options are still experimental. To enable them, users must enable both + `telemetry.useOtelForInternalMetrics` and `telemetry.useOtelWithSDKConfigurationForInternalTelemetry` + feature gates. This change updates `metric_readers` to `readers` to align with the configuration + working group. diff --git a/exporter/otlpexporter/go.mod b/exporter/otlpexporter/go.mod index 2daadd8f71b..5fb69d00177 100644 --- a/exporter/otlpexporter/go.mod +++ b/exporter/otlpexporter/go.mod @@ -53,6 +53,7 @@ require ( go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.42.1-0.20230612162650-64be7e574a17 // indirect go.opentelemetry.io/otel v1.16.0 // indirect go.opentelemetry.io/otel/metric v1.16.0 // indirect + go.opentelemetry.io/otel/sdk v1.16.0 // indirect go.opentelemetry.io/otel/trace v1.16.0 // indirect go.uber.org/atomic v1.10.0 // indirect go.uber.org/multierr v1.11.0 // indirect diff --git a/exporter/otlpexporter/go.sum b/exporter/otlpexporter/go.sum index 837ce0c528d..497d1ad304c 100644 --- a/exporter/otlpexporter/go.sum +++ b/exporter/otlpexporter/go.sum @@ -296,6 +296,7 @@ go.opentelemetry.io/otel/exporters/prometheus v0.39.0 h1:whAaiHxOatgtKd+w0dOi//1 go.opentelemetry.io/otel/metric v1.16.0 h1:RbrpwVG1Hfv85LgnZ7+txXioPDoh6EdbZHo26Q3hqOo= go.opentelemetry.io/otel/metric v1.16.0/go.mod h1:QE47cpOmkwipPiefDwo2wDzwJrlfxxNYodqc4xnGCo4= go.opentelemetry.io/otel/sdk v1.16.0 h1:Z1Ok1YsijYL0CSJpHt4cS3wDDh7p572grzNrBMiMWgE= +go.opentelemetry.io/otel/sdk v1.16.0/go.mod h1:tMsIuKXuuIWPBAOrH+eHtvhTL+SntFtXF9QD68aP6p4= go.opentelemetry.io/otel/sdk/metric v0.39.0 h1:Kun8i1eYf48kHH83RucG93ffz0zGV1sh46FAScOTuDI= go.opentelemetry.io/otel/trace v1.16.0 h1:8JRpaObFoW0pxuVPapkgH8UhHQj+bJW8jJsCZEu5MQs= go.opentelemetry.io/otel/trace v1.16.0/go.mod h1:Yt9vYq1SdNz3xdjZZK7wcXv1qv2pwLkqr2QVwea0ef0= diff --git a/exporter/otlphttpexporter/go.mod b/exporter/otlphttpexporter/go.mod index 40b2d70eb78..6dc715efca5 100644 --- a/exporter/otlphttpexporter/go.mod +++ b/exporter/otlphttpexporter/go.mod @@ -58,6 +58,7 @@ require ( go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.42.0 // indirect go.opentelemetry.io/otel v1.16.0 // indirect go.opentelemetry.io/otel/metric v1.16.0 // indirect + go.opentelemetry.io/otel/sdk v1.16.0 // indirect go.opentelemetry.io/otel/trace v1.16.0 // indirect go.uber.org/atomic v1.10.0 // indirect go.uber.org/multierr v1.11.0 // indirect diff --git a/exporter/otlphttpexporter/go.sum b/exporter/otlphttpexporter/go.sum index cc2949fa25b..d2d4c81e0e2 100644 --- a/exporter/otlphttpexporter/go.sum +++ b/exporter/otlphttpexporter/go.sum @@ -975,6 +975,7 @@ go.opentelemetry.io/otel/exporters/prometheus v0.39.0 h1:whAaiHxOatgtKd+w0dOi//1 go.opentelemetry.io/otel/metric v1.16.0 h1:RbrpwVG1Hfv85LgnZ7+txXioPDoh6EdbZHo26Q3hqOo= go.opentelemetry.io/otel/metric v1.16.0/go.mod h1:QE47cpOmkwipPiefDwo2wDzwJrlfxxNYodqc4xnGCo4= go.opentelemetry.io/otel/sdk v1.16.0 h1:Z1Ok1YsijYL0CSJpHt4cS3wDDh7p572grzNrBMiMWgE= +go.opentelemetry.io/otel/sdk v1.16.0/go.mod h1:tMsIuKXuuIWPBAOrH+eHtvhTL+SntFtXF9QD68aP6p4= go.opentelemetry.io/otel/sdk/metric v0.39.0 h1:Kun8i1eYf48kHH83RucG93ffz0zGV1sh46FAScOTuDI= go.opentelemetry.io/otel/trace v1.16.0 h1:8JRpaObFoW0pxuVPapkgH8UhHQj+bJW8jJsCZEu5MQs= go.opentelemetry.io/otel/trace v1.16.0/go.mod h1:Yt9vYq1SdNz3xdjZZK7wcXv1qv2pwLkqr2QVwea0ef0= diff --git a/extension/zpagesextension/go.mod b/extension/zpagesextension/go.mod index f5c216fcaa2..ce17f4c6893 100644 --- a/extension/zpagesextension/go.mod +++ b/extension/zpagesextension/go.mod @@ -27,6 +27,7 @@ require ( github.com/mitchellh/mapstructure v1.5.1-0.20220423185008-bf980b35cac4 // indirect github.com/mitchellh/reflectwalk v1.0.2 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect + go.opencensus.io v0.24.0 // indirect go.opentelemetry.io/collector/config/configtelemetry v0.81.0 // indirect go.opentelemetry.io/collector/featuregate v1.0.0-rcv0013 // indirect go.opentelemetry.io/collector/pdata v1.0.0-rcv0013 // indirect diff --git a/extension/zpagesextension/go.sum b/extension/zpagesextension/go.sum index 1cabf3a5374..0d3125ffc94 100644 --- a/extension/zpagesextension/go.sum +++ b/extension/zpagesextension/go.sum @@ -66,6 +66,8 @@ github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7a github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= +github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= +github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da h1:oI5xCqsCo564l8iNU+DwB5epxmsaqB+rhGL0m5jtYqE= github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= @@ -90,6 +92,7 @@ github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMyw github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.3/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= @@ -233,11 +236,16 @@ github.com/sirupsen/logrus v1.6.0/go.mod h1:7uNnSEd1DgxDLC74fIahvMZmmYsHGZGEOFrf github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= +github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= +github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= @@ -246,6 +254,8 @@ github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1 go.etcd.io/etcd/api/v3 v3.5.4/go.mod h1:5GB2vv4A4AOn3yk7MftYGHkUfGtDHnEraIjym4dYz5A= go.etcd.io/etcd/client/pkg/v3 v3.5.4/go.mod h1:IJHfcCEKxYu1Os13ZdwCwIUTUVGYTSAM3YSwc9/Ac1g= go.etcd.io/etcd/client/v3 v3.5.4/go.mod h1:ZaRkVgBZC+L+dLCjTcF1hRXpgZXQPOvnA/Ak/gq3kiY= +go.opencensus.io v0.24.0 h1:y73uSU6J157QMP2kn2r30vwW1A2W2WFwSCGnAVxeaD0= +go.opencensus.io v0.24.0/go.mod h1:vNK8G9p7aAivkbmorf4v+7Hgx+Zs0yY+0fOtgBfjQKo= go.opentelemetry.io/contrib/zpages v0.42.0 h1:hFscXKQ9PTjyIVmAr6zIV8cMoiEeR9lPIwPVqHi8+5Q= go.opentelemetry.io/contrib/zpages v0.42.0/go.mod h1:qRJBEfB0iwRKrYImq5qfwTolmY8HXvZBRucvhuTVQZw= go.opentelemetry.io/otel v1.16.0 h1:Z7GVAX/UkAXPKsy94IU+i6thsQS4nb7LviLpnaNeW8s= @@ -294,6 +304,7 @@ golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLL golang.org/x/net v0.0.0-20200625001655-4c5254603344/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= golang.org/x/net v0.0.0-20200822124328-c89045814202/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= +golang.org/x/net v0.0.0-20201110031124-69a78807bb2b/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM= golang.org/x/net v0.0.0-20210410081132-afb366fc7cd1/go.mod h1:9tjilg8BloeKEkVJvy7fQ90B1CfIiPueXVOjqfkSzI8= @@ -385,6 +396,7 @@ google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyac google.golang.org/grpc v1.25.1/go.mod h1:c3i+UQWmh7LiEpx4sFZnkU36qjEYZ0imhYfXVyQciAY= google.golang.org/grpc v1.27.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk= google.golang.org/grpc v1.33.1/go.mod h1:fr5YgcSWrqhRRxogOsw7RzIpsmvOZ6IcH4kBYTpR3n0= +google.golang.org/grpc v1.33.2/go.mod h1:JMHMWHQWaTccqQQlmk3MJZS+GWXOdAesneDmEnv2fbc= google.golang.org/grpc v1.38.0/go.mod h1:NREThFqKR1f3iQ6oBuvc5LadQuXVGo9rkm5ZGrQdJfM= google.golang.org/grpc v1.56.2 h1:fVRFRnXvU+x6C4IlHZewvJOVHoOv1TUuQyoRsYnB4bI= google.golang.org/grpc v1.56.2/go.mod h1:I9bI3vqKfayGqPUAwGdOSu7kt6oIJLixfffKrpXqQ9s= diff --git a/internal/testutil/testutil.go b/internal/testutil/testutil.go index 8f573b8ba37..427e51d2e19 100644 --- a/internal/testutil/testutil.go +++ b/internal/testutil/testutil.go @@ -7,11 +7,14 @@ import ( "net" "os/exec" "runtime" + "strconv" "strings" "testing" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + + "go.opentelemetry.io/collector/service/telemetry" ) type portpair struct { @@ -54,6 +57,22 @@ func GetAvailableLocalAddress(t testing.TB) string { return endpoint } +func GetAvailableLocalAddressPrometheus(t testing.TB) *telemetry.Prometheus { + address := GetAvailableLocalAddress(t) + host, port, err := net.SplitHostPort(address) + if err != nil { + return nil + } + portInt, err := strconv.Atoi(port) + if err != nil { + return nil + } + return &telemetry.Prometheus{ + Host: &host, + Port: &portInt, + } +} + func findAvailableAddress(t testing.TB) string { ln, err := net.Listen("tcp", "localhost:0") require.NoError(t, err, "Failed to get a free local port") diff --git a/service/internal/proctelemetry/config.go b/service/internal/proctelemetry/config.go index f3467d78475..27d7d8c8454 100644 --- a/service/internal/proctelemetry/config.go +++ b/service/internal/proctelemetry/config.go @@ -4,7 +4,16 @@ package proctelemetry // import "go.opentelemetry.io/collector/service/internal/proctelemetry" import ( + "context" + "errors" + "fmt" + "net/http" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promhttp" "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/bridge/opencensus" + otelprom "go.opentelemetry.io/otel/exporters/prometheus" "go.opentelemetry.io/otel/sdk/instrumentation" sdkmetric "go.opentelemetry.io/otel/sdk/metric" "go.opentelemetry.io/otel/sdk/metric/aggregation" @@ -12,6 +21,7 @@ import ( "go.opentelemetry.io/collector/obsreport" semconv "go.opentelemetry.io/collector/semconv/v1.18.0" + "go.opentelemetry.io/collector/service/telemetry" ) const ( @@ -38,6 +48,13 @@ var ( } ) +func InitMetricReader(ctx context.Context, reader telemetry.MetricReader, asyncErrorChannel chan error) (sdkmetric.Reader, *http.Server, error) { + if reader.Pull != nil { + return initExporter(ctx, reader.Pull.Exporter, asyncErrorChannel) + } + return nil, nil, fmt.Errorf("unsupported metric reader type %v", reader) +} + func InitOpenTelemetry(res *resource.Resource, options []sdkmetric.Option, disableHighCardinality bool) (*sdkmetric.MeterProvider, error) { opts := []sdkmetric.Option{ sdkmetric.WithResource(res), @@ -50,6 +67,21 @@ func InitOpenTelemetry(res *resource.Resource, options []sdkmetric.Option, disab ), nil } +func InitPrometheusServer(registry *prometheus.Registry, address string, asyncErrorChannel chan error) *http.Server { + mux := http.NewServeMux() + mux.Handle("/metrics", promhttp.HandlerFor(registry, promhttp.HandlerOpts{})) + server := &http.Server{ + Addr: address, + Handler: mux, + } + go func() { + if serveErr := server.ListenAndServe(); serveErr != nil && !errors.Is(serveErr, http.ErrServerClosed) { + asyncErrorChannel <- serveErr + } + }() + return server +} + func batchViews(disableHighCardinality bool) []sdkmetric.View { views := []sdkmetric.View{ sdkmetric.NewView( @@ -92,3 +124,33 @@ func cardinalityFilter(kvs ...attribute.KeyValue) attribute.Filter { return !filter.HasValue(kv.Key) } } + +func initPrometheusExporter(prometheusConfig *telemetry.Prometheus, asyncErrorChannel chan error) (sdkmetric.Reader, *http.Server, error) { + promRegistry := prometheus.NewRegistry() + if prometheusConfig.Host == nil { + return nil, nil, fmt.Errorf("host must be specified") + } + if prometheusConfig.Port == nil { + return nil, nil, fmt.Errorf("port must be specified") + } + wrappedRegisterer := prometheus.WrapRegistererWithPrefix("otelcol_", promRegistry) + exporter, err := otelprom.New( + otelprom.WithRegisterer(wrappedRegisterer), + // https://github.com/open-telemetry/opentelemetry-collector/issues/8043 + otelprom.WithoutUnits(), + // Disabled for the moment until this becomes stable, and we are ready to break backwards compatibility. + otelprom.WithoutScopeInfo()) + if err != nil { + return nil, nil, fmt.Errorf("error creating otel prometheus exporter: %w", err) + } + + exporter.RegisterProducer(opencensus.NewMetricProducer()) + return exporter, InitPrometheusServer(promRegistry, fmt.Sprintf("%s:%d", *prometheusConfig.Host, *prometheusConfig.Port), asyncErrorChannel), nil +} + +func initExporter(_ context.Context, exporter telemetry.MetricExporter, asyncErrorChannel chan error) (sdkmetric.Reader, *http.Server, error) { + if exporter.Prometheus != nil { + return initPrometheusExporter(exporter.Prometheus, asyncErrorChannel) + } + return nil, nil, fmt.Errorf("no valid exporter") +} diff --git a/service/internal/proctelemetry/config_test.go b/service/internal/proctelemetry/config_test.go index 01e368b2898..29127c58b1b 100644 --- a/service/internal/proctelemetry/config_test.go +++ b/service/internal/proctelemetry/config_test.go @@ -2,3 +2,89 @@ // SPDX-License-Identifier: Apache-2.0 package proctelemetry + +import ( + "context" + "errors" + "testing" + + "github.com/stretchr/testify/assert" + + "go.opentelemetry.io/collector/service/telemetry" +) + +func strPtr(s string) *string { + return &s +} + +func intPtr(i int) *int { + return &i +} + +func TestMetricReader(t *testing.T) { + testCases := []struct { + name string + reader telemetry.MetricReader + args any + err error + }{ + { + name: "noreader", + err: errors.New("unsupported metric reader type { }"), + }, + { + name: "pull prometheus invalid exporter", + reader: telemetry.MetricReader{ + Pull: &telemetry.PullMetricReader{ + Exporter: telemetry.MetricExporter{ + Otlp: &telemetry.OtlpMetric{}, + }, + }, + }, + err: errors.New("no valid exporter"), + }, + { + name: "pull/prometheus-invalid-config-no-host", + reader: telemetry.MetricReader{ + Pull: &telemetry.PullMetricReader{ + Exporter: telemetry.MetricExporter{ + Prometheus: &telemetry.Prometheus{}, + }, + }, + }, + err: errors.New("host must be specified"), + }, + { + name: "pull/prometheus-invalid-config-no-port", + reader: telemetry.MetricReader{ + Pull: &telemetry.PullMetricReader{ + Exporter: telemetry.MetricExporter{ + Prometheus: &telemetry.Prometheus{ + Host: strPtr("locahost"), + }, + }, + }, + }, + err: errors.New("port must be specified"), + }, + { + name: "pull/prometheus-invalid-config-no-port", + reader: telemetry.MetricReader{ + Pull: &telemetry.PullMetricReader{ + Exporter: telemetry.MetricExporter{ + Prometheus: &telemetry.Prometheus{ + Host: strPtr("locahost"), + Port: intPtr(8080), + }, + }, + }, + }, + }, + } + for _, tt := range testCases { + t.Run(tt.name, func(t *testing.T) { + _, _, err := InitMetricReader(context.Background(), tt.reader, make(chan error)) + assert.Equal(t, tt.err, err) + }) + } +} diff --git a/service/telemetry.go b/service/telemetry.go index 1cb8b232ad1..519e6bf987a 100644 --- a/service/telemetry.go +++ b/service/telemetry.go @@ -4,22 +4,21 @@ package service // import "go.opentelemetry.io/collector/service" import ( + "context" "errors" - "fmt" + "net" "net/http" + "strconv" "strings" "unicode" ocprom "contrib.go.opencensus.io/exporter/prometheus" "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/promhttp" ocmetric "go.opencensus.io/metric" "go.opencensus.io/metric/metricproducer" "go.opencensus.io/stats/view" "go.opentelemetry.io/contrib/propagators/b3" "go.opentelemetry.io/otel" - "go.opentelemetry.io/otel/bridge/opencensus" - otelprom "go.opentelemetry.io/otel/exporters/prometheus" "go.opentelemetry.io/otel/metric" "go.opentelemetry.io/otel/metric/noop" "go.opentelemetry.io/otel/propagation" @@ -69,7 +68,7 @@ func newColTelemetry(useOtel bool, disableHighCardinality bool, extendedConfig b } func (tel *telemetryInitializer) init(res *resource.Resource, settings component.TelemetrySettings, cfg telemetry.Config, asyncErrorChannel chan error) error { - if cfg.Metrics.Level == configtelemetry.LevelNone || cfg.Metrics.Address == "" { + if cfg.Metrics.Level == configtelemetry.LevelNone || (cfg.Metrics.Address == "" && len(cfg.Metrics.Readers) == 0) { settings.Logger.Info( "Skipping telemetry setup.", zap.String(zapKeyTelemetryAddress, cfg.Metrics.Address), @@ -86,44 +85,72 @@ func (tel *telemetryInitializer) init(res *resource.Resource, settings component return err } - return tel.initPrometheus(res, settings.Logger, cfg.Metrics.Address, cfg.Metrics.Level, asyncErrorChannel) + return tel.initMetrics(res, settings.Logger, cfg, asyncErrorChannel) } -func (tel *telemetryInitializer) initPrometheus(res *resource.Resource, logger *zap.Logger, address string, level configtelemetry.Level, asyncErrorChannel chan error) error { - promRegistry := prometheus.NewRegistry() - if tel.useOtel { - if err := tel.initOpenTelemetry(res, promRegistry); err != nil { +func (tel *telemetryInitializer) initMetrics(res *resource.Resource, logger *zap.Logger, cfg telemetry.Config, asyncErrorChannel chan error) error { + // Initialize the ocRegistry, still used by the process metrics. + tel.ocRegistry = ocmetric.NewRegistry() + if !tel.useOtel && !tel.extendedConfig { + return tel.initOpenCensus(res, logger, cfg.Metrics.Address, cfg.Metrics.Level, asyncErrorChannel) + } + + if len(cfg.Metrics.Address) != 0 { + if tel.extendedConfig { + logger.Warn("service::telemetry::metrics::address is being deprecated in favor of service::telemetry::metrics::readers") + } + host, port, err := net.SplitHostPort(cfg.Metrics.Address) + if err != nil { return err } - } else { - if err := tel.initOpenCensus(level, res, promRegistry); err != nil { + portInt, err := strconv.Atoi(port) + if err != nil { return err } + if cfg.Metrics.Readers == nil { + cfg.Metrics.Readers = []telemetry.MetricReader{} + } + cfg.Metrics.Readers = append(cfg.Metrics.Readers, telemetry.MetricReader{ + Pull: &telemetry.PullMetricReader{ + Exporter: telemetry.MetricExporter{ + Prometheus: &telemetry.Prometheus{ + Host: &host, + Port: &portInt, + }, + }, + }, + }) } - logger.Info( - "Serving Prometheus metrics", - zap.String(zapKeyTelemetryAddress, address), - zap.String(zapKeyTelemetryLevel, level.String()), - ) + metricproducer.GlobalManager().AddProducer(tel.ocRegistry) + opts := []sdkmetric.Option{} + for _, reader := range cfg.Metrics.Readers { + // https://github.com/open-telemetry/opentelemetry-collector/issues/8045 + r, server, err := proctelemetry.InitMetricReader(context.Background(), reader, asyncErrorChannel) + if err != nil { + return err + } + if server != nil { + tel.servers = append(tel.servers, server) + logger.Info( + "Serving metrics", + zap.String(zapKeyTelemetryAddress, server.Addr), + zap.String(zapKeyTelemetryLevel, cfg.Metrics.Level.String()), + ) + } + opts = append(opts, sdkmetric.WithReader(r)) + } - mux := http.NewServeMux() - mux.Handle("/metrics", promhttp.HandlerFor(promRegistry, promhttp.HandlerOpts{})) - server := &http.Server{ - Addr: address, - Handler: mux, + mp, err := proctelemetry.InitOpenTelemetry(res, opts, tel.disableHighCardinality) + if err != nil { + return err } - tel.servers = append(tel.servers, server) - go func() { - if serveErr := server.ListenAndServe(); serveErr != nil && !errors.Is(serveErr, http.ErrServerClosed) { - asyncErrorChannel <- serveErr - } - }() + tel.mp = mp return nil } -func (tel *telemetryInitializer) initOpenCensus(level configtelemetry.Level, res *resource.Resource, promRegistry *prometheus.Registry) error { - tel.ocRegistry = ocmetric.NewRegistry() +func (tel *telemetryInitializer) initOpenCensus(res *resource.Resource, logger *zap.Logger, address string, level configtelemetry.Level, asyncErrorChannel chan error) error { + promRegistry := prometheus.NewRegistry() metricproducer.GlobalManager().AddProducer(tel.ocRegistry) tel.views = obsreportconfig.AllViews(level) @@ -148,33 +175,13 @@ func (tel *telemetryInitializer) initOpenCensus(level configtelemetry.Level, res } view.RegisterExporter(pe) - return nil -} -func (tel *telemetryInitializer) initOpenTelemetry(res *resource.Resource, promRegistry *prometheus.Registry) error { - // Initialize the ocRegistry, still used by the process metrics. - tel.ocRegistry = ocmetric.NewRegistry() - metricproducer.GlobalManager().AddProducer(tel.ocRegistry) - - wrappedRegisterer := prometheus.WrapRegistererWithPrefix("otelcol_", promRegistry) - // We can remove `otelprom.WithoutUnits()` when the otel-go start exposing prometheus metrics using the OpenMetrics format - // which includes metric units that prometheusreceiver uses to trim unit's suffixes from metric names. - // https://github.com/open-telemetry/opentelemetry-go/issues/3468 - exporter, err := otelprom.New( - otelprom.WithRegisterer(wrappedRegisterer), - otelprom.WithoutUnits(), - // Disabled for the moment until this becomes stable, and we are ready to break backwards compatibility. - otelprom.WithoutScopeInfo()) - if err != nil { - return fmt.Errorf("error creating otel prometheus exporter: %w", err) - } - - exporter.RegisterProducer(opencensus.NewMetricProducer()) - mp, err := proctelemetry.InitOpenTelemetry(res, []sdkmetric.Option{sdkmetric.WithReader(exporter)}, tel.disableHighCardinality) - if err != nil { - return err - } - tel.mp = mp + logger.Info( + "Serving Prometheus metrics", + zap.String(zapKeyTelemetryAddress, address), + zap.String(zapKeyTelemetryLevel, level.String()), + ) + tel.servers = append(tel.servers, proctelemetry.InitPrometheusServer(promRegistry, address, asyncErrorChannel)) return nil } diff --git a/service/telemetry/config.go b/service/telemetry/config.go index 797f53bfd2d..58c3f882449 100644 --- a/service/telemetry/config.go +++ b/service/telemetry/config.go @@ -9,6 +9,8 @@ import ( "go.uber.org/zap/zapcore" "go.opentelemetry.io/collector/config/configtelemetry" + "go.opentelemetry.io/collector/confmap" + "go.opentelemetry.io/collector/internal/obsreportconfig" ) // Config defines the configurable settings for service telemetry. @@ -108,7 +110,7 @@ type MetricsConfig struct { // Readers allow configuration of metric readers to emit metrics to // any number of supported backends. - Readers []MetricReader `mapstructure:"metric_readers"` + Readers []MetricReader `mapstructure:"readers"` } // TracesConfig exposes the common Telemetry configuration for collector's internal spans. @@ -122,11 +124,35 @@ type TracesConfig struct { // Validate checks whether the current configuration is valid func (c *Config) Validate() error { - // Check when service telemetry metric level is not none, the metrics address should not be empty - if c.Metrics.Level != configtelemetry.LevelNone && c.Metrics.Address == "" { - return fmt.Errorf("collector telemetry metric address should exist when metric level is not none") + if c.Metrics.Level != configtelemetry.LevelNone && c.Metrics.Address == "" && len(c.Metrics.Readers) == 0 { + return fmt.Errorf("collector telemetry metric address or reader should exist when metric level is not none") } return nil } + +func (mr *MetricReader) Unmarshal(conf *confmap.Conf) error { + if !obsreportconfig.UseOtelWithSDKConfigurationForInternalTelemetryFeatureGate.IsEnabled() { + fmt.Println(obsreportconfig.UseOtelWithSDKConfigurationForInternalTelemetryFeatureGate.IsEnabled()) + // only unmarshal if feature gate is enabled + return nil + } + + if conf == nil { + return nil + } + + if err := conf.Unmarshal(mr); err != nil { + return fmt.Errorf("invalid metric reader configuration: %w", err) + } + + if mr.Pull != nil { + if mr.Pull.Exporter.Prometheus == nil { + return fmt.Errorf("invalid exporter configuration") + } + return nil + } + + return fmt.Errorf("unsupported metric reader type %s", conf.AllKeys()) +} diff --git a/service/telemetry/config_test.go b/service/telemetry/config_test.go index aa7502188dc..fa8a43f13f5 100644 --- a/service/telemetry/config_test.go +++ b/service/telemetry/config_test.go @@ -7,8 +7,12 @@ import ( "testing" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/config/configtelemetry" + "go.opentelemetry.io/collector/confmap" + "go.opentelemetry.io/collector/featuregate" + "go.opentelemetry.io/collector/internal/obsreportconfig" ) func TestLoadConfig(t *testing.T) { @@ -37,6 +41,18 @@ func TestLoadConfig(t *testing.T) { }, success: false, }, + { + name: "valid metric telemetry with metric readers", + cfg: &Config{ + Metrics: MetricsConfig{ + Level: configtelemetry.LevelBasic, + Readers: []MetricReader{ + {Pull: &PullMetricReader{}}, + }, + }, + }, + success: true, + }, } for _, tt := range tests { @@ -50,3 +66,83 @@ func TestLoadConfig(t *testing.T) { }) } } + +// Force the state of feature gate for a test +func setFeatureGateForTest(t testing.TB, gate *featuregate.Gate, enabled bool) func() { + originalValue := gate.IsEnabled() + require.NoError(t, featuregate.GlobalRegistry().Set(gate.ID(), enabled)) + return func() { + require.NoError(t, featuregate.GlobalRegistry().Set(gate.ID(), originalValue)) + } +} + +func TestUnmarshalMetricReaderWithGateOff(t *testing.T) { + defer setFeatureGateForTest(t, obsreportconfig.UseOtelWithSDKConfigurationForInternalTelemetryFeatureGate, false)() + reader := MetricReader{} + assert.NoError(t, reader.Unmarshal(confmap.NewFromStringMap(map[string]any{"invalid": "invalid"}))) +} + +func TestUnmarshalMetricReader(t *testing.T) { + defer setFeatureGateForTest(t, obsreportconfig.UseOtelWithSDKConfigurationForInternalTelemetryFeatureGate, true)() + tests := []struct { + name string + cfg *confmap.Conf + err string + }{ + { + name: "invalid config", + cfg: confmap.NewFromStringMap(map[string]any{"invalid": "invalid"}), + err: "unsupported metric reader type [invalid]", + }, + { + name: "nil config, nothing to do", + }, + { + name: "invalid pull reader type with valid prometheus exporter", + cfg: confmap.NewFromStringMap(map[string]any{"pull/prometheus1": PullMetricReader{ + Exporter: MetricExporter{ + Prometheus: &Prometheus{}, + }, + }}), + err: "unsupported metric reader type [pull/prometheus1]", + }, + { + name: "valid reader type, invalid config", + cfg: confmap.NewFromStringMap(map[string]any{"pull": "garbage"}), + err: "invalid metric reader configuration", + }, + { + name: "valid pull reader type, no exporter", + cfg: confmap.NewFromStringMap(map[string]any{"pull": PullMetricReader{}}), + err: "invalid exporter configuration", + }, + { + name: "valid pull reader type, invalid exporter", + cfg: confmap.NewFromStringMap(map[string]any{"pull": PullMetricReader{ + Exporter: MetricExporter{ + Prometheus: nil, + }, + }}), + err: "invalid exporter configuration", + }, + { + name: "valid pull reader type, valid prometheus exporter", + cfg: confmap.NewFromStringMap(map[string]any{"pull": PullMetricReader{ + Exporter: MetricExporter{ + Prometheus: &Prometheus{}, + }, + }}), + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + reader := MetricReader{} + err := reader.Unmarshal(tt.cfg) + if len(tt.err) > 0 { + assert.ErrorContains(t, err, tt.err) + } else { + assert.NoError(t, err) + } + }) + } +} diff --git a/service/telemetry_test.go b/service/telemetry_test.go index 10cadbbaa7f..5686ad8f01f 100644 --- a/service/telemetry_test.go +++ b/service/telemetry_test.go @@ -104,6 +104,7 @@ func TestTelemetryInit(t *testing.T) { disableHighCard bool expectedMetrics map[string]metricValue extendedConfig bool + cfg *telemetry.Config }{ { name: "UseOpenCensusForInternalMetrics", @@ -187,26 +188,84 @@ func TestTelemetryInit(t *testing.T) { }, }, }, - } { - t.Run(tc.name, func(t *testing.T) { - tel := newColTelemetry(tc.useOtel, tc.disableHighCard, tc.extendedConfig) - buildInfo := component.NewDefaultBuildInfo() - cfg := telemetry.Config{ + { + name: "UseOTelWithSDKConfiguration", + extendedConfig: true, + cfg: &telemetry.Config{ + Metrics: telemetry.MetricsConfig{ + Level: configtelemetry.LevelDetailed, + }, Resource: map[string]*string{ semconv.AttributeServiceInstanceID: &testInstanceID, }, - Metrics: telemetry.MetricsConfig{ - Level: configtelemetry.LevelDetailed, - Address: testutil.GetAvailableLocalAddress(t), + }, + expectedMetrics: map[string]metricValue{ + metricPrefix + ocPrefix + counterName + "_total": { + value: 13, + labels: map[string]string{}, + }, + metricPrefix + otelPrefix + counterName + "_total": { + value: 13, + labels: map[string]string{}, + }, + metricPrefix + grpcPrefix + counterName + "_total": { + value: 11, + labels: map[string]string{ + "net_sock_peer_addr": "", + "net_sock_peer_name": "", + "net_sock_peer_port": "", + }, + }, + metricPrefix + httpPrefix + counterName + "_total": { + value: 10, + labels: map[string]string{ + "net_host_name": "", + "net_host_port": "", + }, + }, + metricPrefix + "target_info": { + value: 0, + labels: map[string]string{ + "service_name": "otelcol", + "service_version": "latest", + "service_instance_id": testInstanceID, + }, }, + }, + }, + } { + t.Run(tc.name, func(t *testing.T) { + tel := newColTelemetry(tc.useOtel, tc.disableHighCard, tc.extendedConfig) + buildInfo := component.NewDefaultBuildInfo() + if tc.extendedConfig { + tc.cfg.Metrics.Readers = []telemetry.MetricReader{ + { + Pull: &telemetry.PullMetricReader{ + Exporter: telemetry.MetricExporter{ + Prometheus: testutil.GetAvailableLocalAddressPrometheus(t), + }, + }, + }, + } + } + if tc.cfg == nil { + tc.cfg = &telemetry.Config{ + Resource: map[string]*string{ + semconv.AttributeServiceInstanceID: &testInstanceID, + }, + Metrics: telemetry.MetricsConfig{ + Level: configtelemetry.LevelDetailed, + Address: testutil.GetAvailableLocalAddress(t), + }, + } } - otelRes := buildResource(buildInfo, cfg) + otelRes := buildResource(buildInfo, *tc.cfg) res := pdataFromSdk(otelRes) settings := component.TelemetrySettings{ Logger: zap.NewNop(), Resource: res, } - err := tel.init(otelRes, settings, cfg, make(chan error)) + err := tel.init(otelRes, settings, *tc.cfg, make(chan error)) require.NoError(t, err) defer func() { require.NoError(t, tel.shutdown())