Skip to content

Commit

Permalink
First draft for labels and filters
Browse files Browse the repository at this point in the history
  • Loading branch information
Achooo committed May 8, 2023
1 parent dae241a commit 8e25e9b
Show file tree
Hide file tree
Showing 7 changed files with 195 additions and 81 deletions.
15 changes: 0 additions & 15 deletions agent/hcp/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,6 @@ import (
httptransport "github.com/go-openapi/runtime/client"
"github.com/go-openapi/strfmt"

<<<<<<< HEAD
=======
"github.com/hashicorp/consul/agent/hcp/config"
"github.com/hashicorp/consul/version"
>>>>>>> 2d33bd7c31 (Added telemetry agent to client and init sink in deps)
hcptelemetry "github.com/hashicorp/hcp-sdk-go/clients/cloud-consul-telemetry-gateway/preview/2023-04-14/client/consul_telemetry_service"
hcpgnm "github.com/hashicorp/hcp-sdk-go/clients/cloud-global-network-manager-service/preview/2022-02-15/client/global_network_manager_service"
gnmmod "github.com/hashicorp/hcp-sdk-go/clients/cloud-global-network-manager-service/preview/2022-02-15/models"
Expand All @@ -42,15 +37,9 @@ type MetricsConfig struct {
Endpoint string
}
type TelemetryConfig struct {
<<<<<<< HEAD
Endpoint string
Labels map[string]string
MetricsConfig *MetricsConfig
=======
Endpoint string
Labels map[string]string
MetricsOverride *MetricsConfig
>>>>>>> 2d33bd7c31 (Added telemetry agent to client and init sink in deps)
}

type BootstrapConfig struct {
Expand Down Expand Up @@ -118,11 +107,7 @@ func (c *hcpClient) FetchTelemetryConfig(ctx context.Context) (*TelemetryConfig,
return &TelemetryConfig{
Endpoint: payloadConfig.Endpoint,
Labels: payloadConfig.Labels,
<<<<<<< HEAD
MetricsConfig: &MetricsConfig{
=======
MetricsOverride: &MetricsConfig{
>>>>>>> 2d33bd7c31 (Added telemetry agent to client and init sink in deps)
Filters: payloadConfig.Metrics.IncludeList,
Endpoint: payloadConfig.Metrics.Endpoint,
},
Expand Down
31 changes: 22 additions & 9 deletions agent/hcp/deps.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,16 @@ func NewDeps(cfg config.CloudConfig, logger hclog.Logger) (d Deps, err error) {
func sink(hcpClient hcpclient.Client, cfg hcpclient.CloudConfig, logger hclog.Logger) *telemetry.OTELSink {
ctx := context.Background()
ctx = hclog.WithContext(ctx, logger)
url, err := verifyCCMRegistration(ctx, hcpClient)

telemetryCfg, err := fetchTelemetryConfig(ctx, hcpClient)
if err != nil {
logger.Error("failed to verify CCM registration: %w", err)
logger.Error("Failed to fetch telemetry config %w", err)
return nil
}

url, err := verifyCCMRegistration(telemetryCfg)
if err != nil {
logger.Error("Failed to verify CCM registration %w", err)
return nil
}

Expand All @@ -66,8 +73,10 @@ func sink(hcpClient hcpclient.Client, cfg hcpclient.CloudConfig, logger hclog.Lo
}

sinkOpts := &telemetry.OTELSinkOpts{
Logger: logger,
Reader: telemetry.NewOTELReader(metricsClient, url, 10*time.Second),
Logger: logger,
Labels: telemetryCfg.Labels,
Filters: telemetryCfg.MetricsConfig.Filters,
Reader: telemetry.NewOTELReader(metricsClient, url, 10*time.Second),
}

sink, err := telemetry.NewOTELSink(sinkOpts)
Expand All @@ -79,18 +88,22 @@ func sink(hcpClient hcpclient.Client, cfg hcpclient.CloudConfig, logger hclog.Lo
return sink
}

// verifyCCMRegistration checks that a server is registered with the HCP management plane
// by making a HTTP request to the HCP TelemetryConfig endpoint.
// If registered, it returns the endpoint for the HCP Telemetry Gateway endpoint where metrics should be forwarded.
func verifyCCMRegistration(ctx context.Context, client hcpclient.Client) (string, error) {
func fetchTelemetryConfig(ctx context.Context, client hcpclient.Client) (*hcpclient.TelemetryConfig, error) {
reqCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()

telemetryCfg, err := client.FetchTelemetryConfig(reqCtx)
if err != nil {
return "", fmt.Errorf("failed to fetch telemetry config %w", err)
return nil, fmt.Errorf("failed to fetch telemetry config %w", err)
}

return telemetryCfg, nil
}

// verifyCCMRegistration checks that a server is registered with the HCP management plane
// by making a HTTP request to the HCP TelemetryConfig endpoint.
// If registered, it returns the endpoint for the HCP Telemetry Gateway endpoint where metrics should be forwarded.
func verifyCCMRegistration(telemetryCfg *hcpclient.TelemetryConfig) (string, error) {
endpoint := telemetryCfg.Endpoint
if override := telemetryCfg.MetricsConfig.Endpoint; override != "" {
endpoint = override
Expand Down
69 changes: 25 additions & 44 deletions agent/hcp/deps_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package hcp

import (
"context"
"fmt"
"testing"

Expand Down Expand Up @@ -75,69 +74,51 @@ func TestSink(t *testing.T) {

func TestVerifyCCMRegistration(t *testing.T) {
for name, test := range map[string]struct {
expect func(*client.MockClient)
wantErr string
expectedURL string
telemetryCfg *client.TelemetryConfig
wantErr string
expectedURL string
}{
"failsWithFetchTelemetryFailure": {
expect: func(mockClient *client.MockClient) {
mockClient.EXPECT().FetchTelemetryConfig(mock.Anything).Return(nil, fmt.Errorf("FetchTelemetryConfig error"))
},
wantErr: "failed to fetch telemetry config",
},
"failsWithURLParseErr": {
expect: func(mockClient *client.MockClient) {
mockClient.EXPECT().FetchTelemetryConfig(mock.Anything).Return(&client.TelemetryConfig{
// Minimum 2 chars for a domain to be valid.
Endpoint: "s",
MetricsConfig: &client.MetricsConfig{
// Invalid domain chars
Endpoint: " ",
},
}, nil)
telemetryCfg: &client.TelemetryConfig{
// Minimum 2 chars for a domain to be valid.
Endpoint: "s",
MetricsConfig: &client.MetricsConfig{
// Invalid domain chars
Endpoint: " ",
},
},
wantErr: "failed to parse url:",
},
"noErrWithEmptyEndpoint": {
expect: func(mockClient *client.MockClient) {
mockClient.EXPECT().FetchTelemetryConfig(mock.Anything).Return(&client.TelemetryConfig{
telemetryCfg: &client.TelemetryConfig{
Endpoint: "",
MetricsConfig: &client.MetricsConfig{
Endpoint: "",
MetricsConfig: &client.MetricsConfig{
Endpoint: "",
},
}, nil)
},
},
expectedURL: "",
},
"success": {
expect: func(mockClient *client.MockClient) {
mockClient.EXPECT().FetchTelemetryConfig(mock.Anything).Return(&client.TelemetryConfig{
Endpoint: "test.com",
MetricsConfig: &client.MetricsConfig{
Endpoint: "",
},
}, nil)
telemetryCfg: &client.TelemetryConfig{
Endpoint: "test.com",
MetricsConfig: &client.MetricsConfig{
Endpoint: "",
},
},
expectedURL: "https://test.com/v1/metrics",
},
"successMetricsEndpointOverride": {
expect: func(mockClient *client.MockClient) {
mockClient.EXPECT().FetchTelemetryConfig(mock.Anything).Return(&client.TelemetryConfig{
Endpoint: "test.com",
MetricsConfig: &client.MetricsConfig{
Endpoint: "override.com",
},
}, nil)
telemetryCfg: &client.TelemetryConfig{
Endpoint: "test.com",
MetricsConfig: &client.MetricsConfig{
Endpoint: "override.com",
},
},
expectedURL: "https://override.com/v1/metrics",
},
} {
t.Run(name, func(t *testing.T) {
ctx := context.Background()
mClient := client.NewMockClient(t)
test.expect(mClient)

url, err := verifyCCMRegistration(ctx, mClient)
url, err := verifyCCMRegistration(test.telemetryCfg)
if test.wantErr != "" {
require.Empty(t, url)
require.Error(t, err)
Expand Down
42 changes: 42 additions & 0 deletions agent/hcp/telemetry/filter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package telemetry

import (
"fmt"
"regexp"

"github.com/hashicorp/go-multierror"
)

// FilterList holds a map of filters, i.e. regular expressions.
// These filters are used to identify which Consul metrics can be transmitted to HCP.
type FilterList struct {
filters map[string]*regexp.Regexp
}

// NewFilterList returns a FilterList which holds valid regex
// used to filter metrics. It will not fail if invalid REGEX is given, but returns a list of errors.
func NewFilterList(filters []string) (*FilterList, error) {
var mErr error
compiledList := make(map[string]*regexp.Regexp, len(filters))
for idx, filter := range filters {
re, err := regexp.Compile(filter)
if err != nil {
mErr = multierror.Append(mErr, fmt.Errorf("compilation of filter at index %d failed: %w", idx, err))
}
compiledList[filter] = re
}
f := &FilterList{
filters: compiledList,
}
return f, mErr
}

// Match returns true if the metric name matches a REGEX in the allowed metric filters.
func (fl *FilterList) Match(name string) bool {
for _, re := range fl.filters {
if re.Match([]byte(name)) {
return true
}
}
return false
}
39 changes: 39 additions & 0 deletions agent/hcp/telemetry/filter_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package telemetry

import (
"testing"

"github.com/stretchr/testify/require"
)

func TestFilter(t *testing.T) {
for name, tc := range map[string]struct {
filters []string
wantMatch bool
wantErr string
}{
"badFilterRegex": {
filters: []string{"(*LF)"},
wantErr: "compilation of filter at index 0 failed",
},
"matchFound": {
filters: []string{"raft.*"},
wantMatch: true,
},
"matchNotFound": {
filters: []string{"mem.heap_size"},
wantMatch: false,
},
} {
t.Run(name, func(t *testing.T) {
f, err := NewFilterList(tc.filters)
if tc.wantErr != "" {
require.Error(t, err)
require.Contains(t, err.Error(), tc.wantErr)
} else {
m := f.Match("consul.raft.peers")
require.Equal(t, tc.wantMatch, m)
}
})
}
}
35 changes: 32 additions & 3 deletions agent/hcp/telemetry/otel_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,16 @@ import (
)

type OTELSinkOpts struct {
Reader otelsdk.Reader
Logger hclog.Logger
Reader otelsdk.Reader
Logger hclog.Logger
Filters []string
Labels map[string]string
}

type OTELSink struct {
spaceReplacer *strings.Replacer
logger hclog.Logger
filters *FilterList

meterProvider *otelsdk.MeterProvider
meter *otelmetric.Meter
Expand Down Expand Up @@ -54,8 +57,21 @@ func NewOTELSink(opts *OTELSinkOpts) (*OTELSink, error) {
return nil, fmt.Errorf("failed to init OTEL sink: provide valid OTELSinkOpts Reader")
}

attrs := make([]attribute.KeyValue, len(opts.Labels))
for k, v := range opts.Labels {
attrs = append(attrs, attribute.KeyValue{
Key: attribute.Key(k),
Value: attribute.StringValue(v),
})
}

filterList, err := NewFilterList(opts.Filters)
if err != nil {
opts.Logger.Error("Failed to initialize all filters: %w", err)
}

// Setup OTEL Metrics SDK to aggregate, convert and export metrics periodically.
res := resource.NewSchemaless()
res := resource.NewWithAttributes("", attrs...)
meterProvider := otelsdk.NewMeterProvider(otelsdk.WithResource(res), otelsdk.WithReader(opts.Reader))
meter := meterProvider.Meter("github.com/hashicorp/consul/agent/hcp/telemetry")

Expand All @@ -65,6 +81,7 @@ func NewOTELSink(opts *OTELSinkOpts) (*OTELSink, error) {
}

return &OTELSink{
filters: filterList,
spaceReplacer: strings.NewReplacer(" ", "_"),
logger: opts.Logger.Named("otel_sink"),
meterProvider: meterProvider,
Expand Down Expand Up @@ -97,6 +114,10 @@ func (o *OTELSink) IncrCounter(key []string, val float32) {
func (o *OTELSink) SetGaugeWithLabels(key []string, val float32, labels []gometrics.Label) {
k := o.flattenKey(key)

if !o.filters.Match(k) {
return
}

// Set value in global Gauge store.
o.gaugeStore.Store(k, float64(val), toAttributes(labels))

Expand All @@ -118,6 +139,10 @@ func (o *OTELSink) SetGaugeWithLabels(key []string, val float32, labels []gometr
func (o *OTELSink) AddSampleWithLabels(key []string, val float32, labels []gometrics.Label) {
k := o.flattenKey(key)

if !o.filters.Match(k) {
return
}

o.mutex.Lock()
defer o.mutex.Unlock()

Expand All @@ -140,6 +165,10 @@ func (o *OTELSink) AddSampleWithLabels(key []string, val float32, labels []gomet
func (o *OTELSink) IncrCounterWithLabels(key []string, val float32, labels []gometrics.Label) {
k := o.flattenKey(key)

if !o.filters.Match(k) {
return
}

o.mutex.Lock()
defer o.mutex.Unlock()

Expand Down
Loading

0 comments on commit 8e25e9b

Please sign in to comment.