Skip to content

Commit

Permalink
[HCP Observability] Add custom metrics for OTEL sink, improve logging…
Browse files Browse the repository at this point in the history
…, upgrade modules and cleanup metrics client (#17455)

* Add custom metrics for Exporter and transform operations

* Improve deps logging

Run go mod tidy

* Upgrade SDK and OTEL

* Remove the partial success implemetation and check for HTTP status code in metrics client

* Add x-channel

* cleanup logs in deps.go based on PR feedback

* Change to debug log and lowercase

* address test operation feedback

* use GetHumanVersion on version

* Fix error wrapping

* Fix metric names
  • Loading branch information
Achooo authored May 26, 2023
1 parent bac9fb7 commit ac0a3d7
Show file tree
Hide file tree
Showing 11 changed files with 251 additions and 91 deletions.
25 changes: 10 additions & 15 deletions agent/hcp/client/metrics_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ import (
metricpb "go.opentelemetry.io/proto/otlp/metrics/v1"
"golang.org/x/oauth2"
"google.golang.org/protobuf/proto"

"github.com/hashicorp/consul/version"
)

const (
Expand Down Expand Up @@ -72,8 +74,9 @@ func NewMetricsClient(cfg CloudConfig, ctx context.Context) (MetricsClient, erro
}

header := make(http.Header)
header.Set("Content-Type", "application/x-protobuf")
header.Set("content-type", "application/x-protobuf")
header.Set("x-hcp-resource-id", r.String())
header.Set("x-channel", fmt.Sprintf("consul/%s", version.GetHumanVersion()))

return &otlpClient{
client: c,
Expand Down Expand Up @@ -124,36 +127,28 @@ func (o *otlpClient) ExportMetrics(ctx context.Context, protoMetrics *metricpb.R

body, err := proto.Marshal(pbRequest)
if err != nil {
return fmt.Errorf("failed to export metrics: %v", err)
return fmt.Errorf("failed to marshal the request: %w", err)
}

req, err := retryablehttp.NewRequest(http.MethodPost, endpoint, bytes.NewBuffer(body))
if err != nil {
return fmt.Errorf("failed to export metrics: %v", err)
return fmt.Errorf("failed to create request: %w", err)
}
req.Header = *o.header

resp, err := o.client.Do(req.WithContext(ctx))
if err != nil {
return fmt.Errorf("failed to export metrics: %v", err)
return fmt.Errorf("failed to post metrics: %w", err)
}
defer resp.Body.Close()

var respData bytes.Buffer
if _, err := io.Copy(&respData, resp.Body); err != nil {
return fmt.Errorf("failed to export metrics: %v", err)
return fmt.Errorf("failed to read body: %w", err)
}

if respData.Len() != 0 {
var respProto colmetricpb.ExportMetricsServiceResponse
if err := proto.Unmarshal(respData.Bytes(), &respProto); err != nil {
return fmt.Errorf("failed to export metrics: %v", err)
}

if respProto.PartialSuccess != nil {
msg := respProto.PartialSuccess.GetErrorMessage()
return fmt.Errorf("failed to export metrics: partial success: %s", msg)
}
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("failed to export metrics: code %d: %s", resp.StatusCode, string(body))
}

return nil
Expand Down
13 changes: 5 additions & 8 deletions agent/hcp/client/metrics_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import (
colpb "go.opentelemetry.io/proto/otlp/collector/metrics/v1"
metricpb "go.opentelemetry.io/proto/otlp/metrics/v1"
"google.golang.org/protobuf/proto"

"github.com/hashicorp/consul/version"
)

func TestNewMetricsClient(t *testing.T) {
Expand Down Expand Up @@ -72,22 +74,17 @@ func TestExportMetrics(t *testing.T) {
},
"failsWithNonRetryableError": {
status: http.StatusBadRequest,
wantErr: "failed to export metrics",
wantErr: "failed to export metrics: code 400",
},
} {
t.Run(name, func(t *testing.T) {
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
require.Equal(t, r.Header.Get("Content-Type"), "application/x-protobuf")
require.Equal(t, r.Header.Get("content-type"), "application/x-protobuf")
require.Equal(t, r.Header.Get("x-hcp-resource-id"), testResourceID)
require.Equal(t, r.Header.Get("x-channel"), fmt.Sprintf("consul/%s", version.GetHumanVersion()))
require.Equal(t, r.Header.Get("Authorization"), "Bearer test-token")

body := colpb.ExportMetricsServiceResponse{}

if test.wantErr != "" {
body.PartialSuccess = &colpb.ExportMetricsPartialSuccess{
ErrorMessage: "partial failure",
}
}
bytes, err := proto.Marshal(&body)

require.NoError(t, err)
Expand Down
21 changes: 14 additions & 7 deletions agent/hcp/deps.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package hcp

import (
"context"
"fmt"
"net/url"
"time"

Expand All @@ -24,20 +25,24 @@ type Deps struct {
Sink metrics.MetricSink
}

func NewDeps(cfg config.CloudConfig, logger hclog.Logger, nodeID types.NodeID) (d Deps, err error) {
d.Client, err = hcpclient.NewClient(cfg)
func NewDeps(cfg config.CloudConfig, logger hclog.Logger, nodeID types.NodeID) (Deps, error) {
client, err := hcpclient.NewClient(cfg)
if err != nil {
return
return Deps{}, fmt.Errorf("failed to init client: %w:", err)
}

d.Provider, err = scada.New(cfg, logger.Named("hcp.scada"))
provider, err := scada.New(cfg, logger.Named("scada"))
if err != nil {
return
return Deps{}, fmt.Errorf("failed to init scada: %w", err)
}

d.Sink = sink(d.Client, &cfg, logger, nodeID)
sink := sink(client, &cfg, logger.Named("sink"), nodeID)

return
return Deps{
Client: client,
Provider: provider,
Sink: sink,
}, nil
}

// sink provides initializes an OTELSink which forwards Consul metrics to HCP.
Expand Down Expand Up @@ -86,5 +91,7 @@ func sink(hcpClient hcpclient.Client, cfg hcpclient.CloudConfig, logger hclog.Lo
return nil
}

logger.Debug("initialized HCP metrics sink")

return sink
}
14 changes: 14 additions & 0 deletions agent/hcp/telemetry/custom_metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package telemetry

// Keys for custom Go Metrics metrics emitted only for the OTEL
// export (exporter.go) and transform (transform.go) failures and successes.
// These enable us to monitor OTEL operations.
var (
internalMetricTransformFailure []string = []string{"hcp", "otel", "transform", "failure"}

internalMetricExportSuccess []string = []string{"hcp", "otel", "exporter", "export", "sucess"}
internalMetricExportFailure []string = []string{"hcp", "otel", "exporter", "export", "failure"}

internalMetricExporterShutdown []string = []string{"hcp", "otel", "exporter", "shutdown"}
internalMetricExporterForceFlush []string = []string{"hcp", "otel", "exporter", "force_flush"}
)
15 changes: 12 additions & 3 deletions agent/hcp/telemetry/otel_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@ package telemetry

import (
"context"
"fmt"
"net/url"

goMetrics "github.com/armon/go-metrics"
"go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/metric/aggregation"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
Expand Down Expand Up @@ -56,17 +58,24 @@ func (e *OTELExporter) Export(ctx context.Context, metrics *metricdata.ResourceM
if isEmpty(otlpMetrics) {
return nil
}
return e.client.ExportMetrics(ctx, otlpMetrics, e.endpoint.String())
err := e.client.ExportMetrics(ctx, otlpMetrics, e.endpoint.String())
if err != nil {
goMetrics.IncrCounter(internalMetricExportFailure, 1)
return fmt.Errorf("failed to export metrics: %w", err)
}

goMetrics.IncrCounter(internalMetricExportSuccess, 1)
return nil
}

// ForceFlush is a no-op, as the MetricsClient client holds no state.
func (e *OTELExporter) ForceFlush(ctx context.Context) error {
// TODO: Emit metric when this operation occurs.
goMetrics.IncrCounter(internalMetricExporterForceFlush, 1)
return ctx.Err()
}

// Shutdown is a no-op, as the MetricsClient is a HTTP client that requires no graceful shutdown.
func (e *OTELExporter) Shutdown(ctx context.Context) error {
// TODO: Emit metric when this operation occurs.
goMetrics.IncrCounter(internalMetricExporterShutdown, 1)
return ctx.Err()
}
85 changes: 77 additions & 8 deletions agent/hcp/telemetry/otel_exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,11 @@ import (
"context"
"fmt"
"net/url"
"strings"
"testing"
"time"

"github.com/armon/go-metrics"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/metric/aggregation"
Expand All @@ -16,6 +19,14 @@ import (
"github.com/hashicorp/consul/agent/hcp/client"
)

type mockMetricsClient struct {
exportErr error
}

func (m *mockMetricsClient) ExportMetrics(ctx context.Context, protoMetrics *metricpb.ResourceMetrics, endpoint string) error {
return m.exportErr
}

func TestTemporality(t *testing.T) {
t.Parallel()
exp := &OTELExporter{}
Expand Down Expand Up @@ -50,14 +61,6 @@ func TestAggregation(t *testing.T) {
}
}

type mockMetricsClient struct {
exportErr error
}

func (m *mockMetricsClient) ExportMetrics(ctx context.Context, protoMetrics *metricpb.ResourceMetrics, endpoint string) error {
return m.exportErr
}

func TestExport(t *testing.T) {
t.Parallel()
for name, test := range map[string]struct {
Expand Down Expand Up @@ -111,6 +114,72 @@ func TestExport(t *testing.T) {
}
}

// TestExport_CustomMetrics tests that a custom metric (hcp.otel.exporter.*) is emitted
// for exporter operations. This test cannot be run in parallel as the metrics.NewGlobal()
// sets a shared global sink.
func TestExport_CustomMetrics(t *testing.T) {
for name, tc := range map[string]struct {
client client.MetricsClient
metricKey []string
operation string
}{
"exportSuccessEmitsCustomMetric": {
client: &mockMetricsClient{},
metricKey: internalMetricExportSuccess,
operation: "export",
},
"exportFailureEmitsCustomMetric": {
client: &mockMetricsClient{
exportErr: fmt.Errorf("client err"),
},
metricKey: internalMetricExportFailure,
operation: "export",
},
"shutdownEmitsCustomMetric": {
metricKey: internalMetricExporterShutdown,
operation: "shutdown",
},
"forceFlushEmitsCustomMetric": {
metricKey: internalMetricExporterForceFlush,
operation: "flush",
},
} {
t.Run(name, func(t *testing.T) {
// Init global sink.
serviceName := "test.transform"
cfg := metrics.DefaultConfig(serviceName)
cfg.EnableHostname = false

sink := metrics.NewInmemSink(10*time.Second, 10*time.Second)
metrics.NewGlobal(cfg, sink)

// Perform operation that emits metric.
exp := NewOTELExporter(tc.client, &url.URL{})

ctx := context.Background()
switch tc.operation {
case "flush":
exp.ForceFlush(ctx)
case "shutdown":
exp.Shutdown(ctx)
default:
exp.Export(ctx, inputResourceMetrics)
}

// Collect sink metrics.
intervals := sink.Data()
require.Len(t, intervals, 1)
key := serviceName + "." + strings.Join(tc.metricKey, ".")
sv := intervals[0].Counters[key]

// Verify count for transform failure metric.
require.NotNil(t, sv)
require.NotNil(t, sv.AggregateSample)
require.Equal(t, 1, sv.AggregateSample.Count)
})
}
}

func TestForceFlush(t *testing.T) {
t.Parallel()
exp := &OTELExporter{}
Expand Down
3 changes: 2 additions & 1 deletion agent/hcp/telemetry/otlp_transform.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"errors"
"fmt"

goMetrics "github.com/armon/go-metrics"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
cpb "go.opentelemetry.io/proto/otlp/common/v1"
Expand Down Expand Up @@ -70,7 +71,7 @@ func metricsToPB(metrics []metricdata.Metrics) []*mpb.Metric {
for _, m := range metrics {
o, err := metricTypeToPB(m)
if err != nil {
// TODO: Emit metric when a transformation failure occurs.
goMetrics.IncrCounter(internalMetricTransformFailure, 1)
continue
}
out = append(out, o)
Expand Down
Loading

0 comments on commit ac0a3d7

Please sign in to comment.