From 8e25e9b89699f33a8d735688f3582532db8f9deb Mon Sep 17 00:00:00 2001 From: Ashvitha Sridharan Date: Fri, 28 Apr 2023 15:00:39 -0400 Subject: [PATCH] First draft for labels and filters --- agent/hcp/client/client.go | 15 ------ agent/hcp/deps.go | 31 ++++++++---- agent/hcp/deps_test.go | 69 ++++++++++----------------- agent/hcp/telemetry/filter.go | 42 ++++++++++++++++ agent/hcp/telemetry/filter_test.go | 39 +++++++++++++++ agent/hcp/telemetry/otel_sink.go | 35 ++++++++++++-- agent/hcp/telemetry/otel_sink_test.go | 45 +++++++++++++---- 7 files changed, 195 insertions(+), 81 deletions(-) create mode 100644 agent/hcp/telemetry/filter.go create mode 100644 agent/hcp/telemetry/filter_test.go diff --git a/agent/hcp/client/client.go b/agent/hcp/client/client.go index 79de0e558686..642beb8d75d0 100644 --- a/agent/hcp/client/client.go +++ b/agent/hcp/client/client.go @@ -15,11 +15,6 @@ import ( httptransport "github.com/go-openapi/runtime/client" "github.com/go-openapi/strfmt" -<<<<<<< HEAD -======= - "github.com/hashicorp/consul/agent/hcp/config" - "github.com/hashicorp/consul/version" ->>>>>>> 2d33bd7c31 (Added telemetry agent to client and init sink in deps) hcptelemetry "github.com/hashicorp/hcp-sdk-go/clients/cloud-consul-telemetry-gateway/preview/2023-04-14/client/consul_telemetry_service" hcpgnm "github.com/hashicorp/hcp-sdk-go/clients/cloud-global-network-manager-service/preview/2022-02-15/client/global_network_manager_service" gnmmod "github.com/hashicorp/hcp-sdk-go/clients/cloud-global-network-manager-service/preview/2022-02-15/models" @@ -42,15 +37,9 @@ type MetricsConfig struct { Endpoint string } type TelemetryConfig struct { -<<<<<<< HEAD Endpoint string Labels map[string]string MetricsConfig *MetricsConfig -======= - Endpoint string - Labels map[string]string - MetricsOverride *MetricsConfig ->>>>>>> 2d33bd7c31 (Added telemetry agent to client and init sink in deps) } type BootstrapConfig struct { @@ -118,11 +107,7 @@ func (c *hcpClient) FetchTelemetryConfig(ctx context.Context) (*TelemetryConfig, return &TelemetryConfig{ Endpoint: payloadConfig.Endpoint, Labels: payloadConfig.Labels, -<<<<<<< HEAD MetricsConfig: &MetricsConfig{ -======= - MetricsOverride: &MetricsConfig{ ->>>>>>> 2d33bd7c31 (Added telemetry agent to client and init sink in deps) Filters: payloadConfig.Metrics.IncludeList, Endpoint: payloadConfig.Metrics.Endpoint, }, diff --git a/agent/hcp/deps.go b/agent/hcp/deps.go index 74ef694e6247..90599a179032 100644 --- a/agent/hcp/deps.go +++ b/agent/hcp/deps.go @@ -47,9 +47,16 @@ func NewDeps(cfg config.CloudConfig, logger hclog.Logger) (d Deps, err error) { func sink(hcpClient hcpclient.Client, cfg hcpclient.CloudConfig, logger hclog.Logger) *telemetry.OTELSink { ctx := context.Background() ctx = hclog.WithContext(ctx, logger) - url, err := verifyCCMRegistration(ctx, hcpClient) + + telemetryCfg, err := fetchTelemetryConfig(ctx, hcpClient) if err != nil { - logger.Error("failed to verify CCM registration: %w", err) + logger.Error("Failed to fetch telemetry config %w", err) + return nil + } + + url, err := verifyCCMRegistration(telemetryCfg) + if err != nil { + logger.Error("Failed to verify CCM registration %w", err) return nil } @@ -66,8 +73,10 @@ func sink(hcpClient hcpclient.Client, cfg hcpclient.CloudConfig, logger hclog.Lo } sinkOpts := &telemetry.OTELSinkOpts{ - Logger: logger, - Reader: telemetry.NewOTELReader(metricsClient, url, 10*time.Second), + Logger: logger, + Labels: telemetryCfg.Labels, + Filters: telemetryCfg.MetricsConfig.Filters, + Reader: telemetry.NewOTELReader(metricsClient, url, 10*time.Second), } sink, err := telemetry.NewOTELSink(sinkOpts) @@ -79,18 +88,22 @@ func sink(hcpClient hcpclient.Client, cfg hcpclient.CloudConfig, logger hclog.Lo return sink } -// verifyCCMRegistration checks that a server is registered with the HCP management plane -// by making a HTTP request to the HCP TelemetryConfig endpoint. -// If registered, it returns the endpoint for the HCP Telemetry Gateway endpoint where metrics should be forwarded. -func verifyCCMRegistration(ctx context.Context, client hcpclient.Client) (string, error) { +func fetchTelemetryConfig(ctx context.Context, client hcpclient.Client) (*hcpclient.TelemetryConfig, error) { reqCtx, cancel := context.WithTimeout(ctx, 5*time.Second) defer cancel() telemetryCfg, err := client.FetchTelemetryConfig(reqCtx) if err != nil { - return "", fmt.Errorf("failed to fetch telemetry config %w", err) + return nil, fmt.Errorf("failed to fetch telemetry config %w", err) } + return telemetryCfg, nil +} + +// verifyCCMRegistration checks that a server is registered with the HCP management plane +// by making a HTTP request to the HCP TelemetryConfig endpoint. +// If registered, it returns the endpoint for the HCP Telemetry Gateway endpoint where metrics should be forwarded. +func verifyCCMRegistration(telemetryCfg *hcpclient.TelemetryConfig) (string, error) { endpoint := telemetryCfg.Endpoint if override := telemetryCfg.MetricsConfig.Endpoint; override != "" { endpoint = override diff --git a/agent/hcp/deps_test.go b/agent/hcp/deps_test.go index 8198d1f390a7..ac79d0fa253d 100644 --- a/agent/hcp/deps_test.go +++ b/agent/hcp/deps_test.go @@ -1,7 +1,6 @@ package hcp import ( - "context" "fmt" "testing" @@ -75,69 +74,51 @@ func TestSink(t *testing.T) { func TestVerifyCCMRegistration(t *testing.T) { for name, test := range map[string]struct { - expect func(*client.MockClient) - wantErr string - expectedURL string + telemetryCfg *client.TelemetryConfig + wantErr string + expectedURL string }{ - "failsWithFetchTelemetryFailure": { - expect: func(mockClient *client.MockClient) { - mockClient.EXPECT().FetchTelemetryConfig(mock.Anything).Return(nil, fmt.Errorf("FetchTelemetryConfig error")) - }, - wantErr: "failed to fetch telemetry config", - }, "failsWithURLParseErr": { - expect: func(mockClient *client.MockClient) { - mockClient.EXPECT().FetchTelemetryConfig(mock.Anything).Return(&client.TelemetryConfig{ - // Minimum 2 chars for a domain to be valid. - Endpoint: "s", - MetricsConfig: &client.MetricsConfig{ - // Invalid domain chars - Endpoint: " ", - }, - }, nil) + telemetryCfg: &client.TelemetryConfig{ + // Minimum 2 chars for a domain to be valid. + Endpoint: "s", + MetricsConfig: &client.MetricsConfig{ + // Invalid domain chars + Endpoint: " ", + }, }, wantErr: "failed to parse url:", }, "noErrWithEmptyEndpoint": { - expect: func(mockClient *client.MockClient) { - mockClient.EXPECT().FetchTelemetryConfig(mock.Anything).Return(&client.TelemetryConfig{ + telemetryCfg: &client.TelemetryConfig{ + Endpoint: "", + MetricsConfig: &client.MetricsConfig{ Endpoint: "", - MetricsConfig: &client.MetricsConfig{ - Endpoint: "", - }, - }, nil) + }, }, expectedURL: "", }, "success": { - expect: func(mockClient *client.MockClient) { - mockClient.EXPECT().FetchTelemetryConfig(mock.Anything).Return(&client.TelemetryConfig{ - Endpoint: "test.com", - MetricsConfig: &client.MetricsConfig{ - Endpoint: "", - }, - }, nil) + telemetryCfg: &client.TelemetryConfig{ + Endpoint: "test.com", + MetricsConfig: &client.MetricsConfig{ + Endpoint: "", + }, }, expectedURL: "https://test.com/v1/metrics", }, "successMetricsEndpointOverride": { - expect: func(mockClient *client.MockClient) { - mockClient.EXPECT().FetchTelemetryConfig(mock.Anything).Return(&client.TelemetryConfig{ - Endpoint: "test.com", - MetricsConfig: &client.MetricsConfig{ - Endpoint: "override.com", - }, - }, nil) + telemetryCfg: &client.TelemetryConfig{ + Endpoint: "test.com", + MetricsConfig: &client.MetricsConfig{ + Endpoint: "override.com", + }, }, expectedURL: "https://override.com/v1/metrics", }, } { t.Run(name, func(t *testing.T) { - ctx := context.Background() - mClient := client.NewMockClient(t) - test.expect(mClient) - - url, err := verifyCCMRegistration(ctx, mClient) + url, err := verifyCCMRegistration(test.telemetryCfg) if test.wantErr != "" { require.Empty(t, url) require.Error(t, err) diff --git a/agent/hcp/telemetry/filter.go b/agent/hcp/telemetry/filter.go new file mode 100644 index 000000000000..d9cfd3418122 --- /dev/null +++ b/agent/hcp/telemetry/filter.go @@ -0,0 +1,42 @@ +package telemetry + +import ( + "fmt" + "regexp" + + "github.com/hashicorp/go-multierror" +) + +// FilterList holds a map of filters, i.e. regular expressions. +// These filters are used to identify which Consul metrics can be transmitted to HCP. +type FilterList struct { + filters map[string]*regexp.Regexp +} + +// NewFilterList returns a FilterList which holds valid regex +// used to filter metrics. It will not fail if invalid REGEX is given, but returns a list of errors. +func NewFilterList(filters []string) (*FilterList, error) { + var mErr error + compiledList := make(map[string]*regexp.Regexp, len(filters)) + for idx, filter := range filters { + re, err := regexp.Compile(filter) + if err != nil { + mErr = multierror.Append(mErr, fmt.Errorf("compilation of filter at index %d failed: %w", idx, err)) + } + compiledList[filter] = re + } + f := &FilterList{ + filters: compiledList, + } + return f, mErr +} + +// Match returns true if the metric name matches a REGEX in the allowed metric filters. +func (fl *FilterList) Match(name string) bool { + for _, re := range fl.filters { + if re.Match([]byte(name)) { + return true + } + } + return false +} diff --git a/agent/hcp/telemetry/filter_test.go b/agent/hcp/telemetry/filter_test.go new file mode 100644 index 000000000000..8b6bc453cc84 --- /dev/null +++ b/agent/hcp/telemetry/filter_test.go @@ -0,0 +1,39 @@ +package telemetry + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestFilter(t *testing.T) { + for name, tc := range map[string]struct { + filters []string + wantMatch bool + wantErr string + }{ + "badFilterRegex": { + filters: []string{"(*LF)"}, + wantErr: "compilation of filter at index 0 failed", + }, + "matchFound": { + filters: []string{"raft.*"}, + wantMatch: true, + }, + "matchNotFound": { + filters: []string{"mem.heap_size"}, + wantMatch: false, + }, + } { + t.Run(name, func(t *testing.T) { + f, err := NewFilterList(tc.filters) + if tc.wantErr != "" { + require.Error(t, err) + require.Contains(t, err.Error(), tc.wantErr) + } else { + m := f.Match("consul.raft.peers") + require.Equal(t, tc.wantMatch, m) + } + }) + } +} diff --git a/agent/hcp/telemetry/otel_sink.go b/agent/hcp/telemetry/otel_sink.go index 124a5a1f5af1..88f47c52d940 100644 --- a/agent/hcp/telemetry/otel_sink.go +++ b/agent/hcp/telemetry/otel_sink.go @@ -20,13 +20,16 @@ import ( ) type OTELSinkOpts struct { - Reader otelsdk.Reader - Logger hclog.Logger + Reader otelsdk.Reader + Logger hclog.Logger + Filters []string + Labels map[string]string } type OTELSink struct { spaceReplacer *strings.Replacer logger hclog.Logger + filters *FilterList meterProvider *otelsdk.MeterProvider meter *otelmetric.Meter @@ -54,8 +57,21 @@ func NewOTELSink(opts *OTELSinkOpts) (*OTELSink, error) { return nil, fmt.Errorf("failed to init OTEL sink: provide valid OTELSinkOpts Reader") } + attrs := make([]attribute.KeyValue, len(opts.Labels)) + for k, v := range opts.Labels { + attrs = append(attrs, attribute.KeyValue{ + Key: attribute.Key(k), + Value: attribute.StringValue(v), + }) + } + + filterList, err := NewFilterList(opts.Filters) + if err != nil { + opts.Logger.Error("Failed to initialize all filters: %w", err) + } + // Setup OTEL Metrics SDK to aggregate, convert and export metrics periodically. - res := resource.NewSchemaless() + res := resource.NewWithAttributes("", attrs...) meterProvider := otelsdk.NewMeterProvider(otelsdk.WithResource(res), otelsdk.WithReader(opts.Reader)) meter := meterProvider.Meter("github.com/hashicorp/consul/agent/hcp/telemetry") @@ -65,6 +81,7 @@ func NewOTELSink(opts *OTELSinkOpts) (*OTELSink, error) { } return &OTELSink{ + filters: filterList, spaceReplacer: strings.NewReplacer(" ", "_"), logger: opts.Logger.Named("otel_sink"), meterProvider: meterProvider, @@ -97,6 +114,10 @@ func (o *OTELSink) IncrCounter(key []string, val float32) { func (o *OTELSink) SetGaugeWithLabels(key []string, val float32, labels []gometrics.Label) { k := o.flattenKey(key) + if !o.filters.Match(k) { + return + } + // Set value in global Gauge store. o.gaugeStore.Store(k, float64(val), toAttributes(labels)) @@ -118,6 +139,10 @@ func (o *OTELSink) SetGaugeWithLabels(key []string, val float32, labels []gometr func (o *OTELSink) AddSampleWithLabels(key []string, val float32, labels []gometrics.Label) { k := o.flattenKey(key) + if !o.filters.Match(k) { + return + } + o.mutex.Lock() defer o.mutex.Unlock() @@ -140,6 +165,10 @@ func (o *OTELSink) AddSampleWithLabels(key []string, val float32, labels []gomet func (o *OTELSink) IncrCounterWithLabels(key []string, val float32, labels []gometrics.Label) { k := o.flattenKey(key) + if !o.filters.Match(k) { + return + } + o.mutex.Lock() defer o.mutex.Unlock() diff --git a/agent/hcp/telemetry/otel_sink_test.go b/agent/hcp/telemetry/otel_sink_test.go index 92a1bbc6316e..f5f640af0727 100644 --- a/agent/hcp/telemetry/otel_sink_test.go +++ b/agent/hcp/telemetry/otel_sink_test.go @@ -15,11 +15,16 @@ import ( ) var ( - attrs = attribute.NewSet(attribute.KeyValue{ + expectedResource = resource.NewWithAttributes("", attribute.KeyValue{ Key: attribute.Key("server.id"), Value: attribute.StringValue("test"), }) + attrs = attribute.NewSet(attribute.KeyValue{ + Key: attribute.Key("metric.label"), + Value: attribute.StringValue("test"), + }) + expectedMetrics = map[string]metricdata.Metrics{ "consul.raft.leader": { Name: "consul.raft.leader", @@ -127,6 +132,16 @@ func TestNewOTELSink(t *testing.T) { Reader: nil, }, }, + "success": { + opts: &OTELSinkOpts{ + Logger: hclog.New(&hclog.LoggerOptions{Output: io.Discard}), + Reader: metric.NewManualReader(), + Labels: map[string]string{ + "server": "test", + }, + Filters: []string{"raft"}, + }, + }, } { t.Run(name, func(t *testing.T) { sink, err := NewOTELSink(test.opts) @@ -147,8 +162,12 @@ func TestOTELSink(t *testing.T) { ctx := context.Background() opts := &OTELSinkOpts{ - Logger: hclog.New(&hclog.LoggerOptions{Output: io.Discard}), - Reader: reader, + Logger: hclog.New(&hclog.LoggerOptions{Output: io.Discard}), + Reader: reader, + Filters: []string{"raft", "autopilot"}, + Labels: map[string]string{ + "server.id": "test", + }, } sink, err := NewOTELSink(opts) @@ -156,7 +175,7 @@ func TestOTELSink(t *testing.T) { labels := []gometrics.Label{ { - Name: "server.id", + Name: "metric.label", Value: "test", }, } @@ -175,13 +194,19 @@ func TestOTELSink(t *testing.T) { require.NoError(t, err) // Validate resource - require.Equal(t, resource.NewSchemaless(), collected.Resource) + require.Equal(t, expectedResource, collected.Resource) + require.Equal(t, 1, len(collected.ScopeMetrics)) + + collectedMetrics := collected.ScopeMetrics[0].Metrics + + collectedMetricsMap := make(map[string]metricdata.Metrics, len(collectedMetrics)) + for _, v := range collectedMetrics { + collectedMetricsMap[v.Name] = v + } - // Validate metrics - for _, actual := range collected.ScopeMetrics[0].Metrics { - name := actual.Name - expected, ok := expectedMetrics[name] - require.True(t, ok, "metric key %s should be in expectedMetrics map", name) + for key, expected := range expectedMetrics { + actual, ok := collectedMetricsMap[key] + require.True(t, ok, "metric key %s should be in expectedMetrics map", key) isSameMetrics(t, expected, actual) } }