-
Notifications
You must be signed in to change notification settings - Fork 4.4k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[HCP Observability] OTELExporter (#17128)
* Client configured with TLS using HCP config and retry/throttle * run go mod tidy * Remove one abstraction to use the config from deps * Address PR feedback * Client configured with TLS using HCP config and retry/throttle * run go mod tidy * Create new OTELExporter which uses the MetricsClient Add transform because the conversion is in an /internal package * Fix lint error * early return when there are no metrics * Add NewOTELExporter() function * Downgrade to metrics SDK version: v1.15.0-rc.1 * Fix imports * fix small nits with comments and url.URL * Fix tests by asserting actual error for context cancellation, fix parallel, and make mock more versatile * Cleanup error handling and clarify empty metrics case * Fix input/expected naming in otel_transform_test.go * add comment for metric tracking * Add a general isEmpty method * Add clear error types * update to latest version 1.15.0 of OTEL
- Loading branch information
Showing
7 changed files
with
712 additions
and
5 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,12 @@ | ||
// Package telemetry implements functionality to collect, aggregate, convert and export | ||
// telemetry data in OpenTelemetry Protocol (OTLP) format. | ||
// | ||
// The entrypoint is the OpenTelemetry (OTEL) go-metrics sink which: | ||
// - Receives metric data. | ||
// - Aggregates metric data using the OTEL Go Metrics SDK. | ||
// - Exports metric data using a configurable OTEL exporter. | ||
// | ||
// The package also provides an OTEL exporter implementation to be used within the sink, which: | ||
// - Transforms metric data from the Metrics SDK OTEL representation to OTLP format. | ||
// - Exports OTLP metric data to an external endpoint using a configurable client. | ||
package telemetry |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,72 @@ | ||
package telemetry | ||
|
||
import ( | ||
"context" | ||
"net/url" | ||
|
||
"go.opentelemetry.io/otel/sdk/metric" | ||
"go.opentelemetry.io/otel/sdk/metric/aggregation" | ||
"go.opentelemetry.io/otel/sdk/metric/metricdata" | ||
|
||
hcpclient "github.com/hashicorp/consul/agent/hcp/client" | ||
) | ||
|
||
// OTELExporter is a custom implementation of a OTEL Metrics SDK metrics.Exporter. | ||
// The exporter is used by a OTEL Metrics SDK PeriodicReader to export aggregated metrics. | ||
// This allows us to use a custom client - HCP authenticated MetricsClient. | ||
type OTELExporter struct { | ||
client hcpclient.MetricsClient | ||
url url.URL | ||
} | ||
|
||
// NewOTELExporter returns a configured OTELExporter | ||
func NewOTELExporter(client hcpclient.MetricsClient, url url.URL) *OTELExporter { | ||
return &OTELExporter{ | ||
client: client, | ||
url: url, | ||
} | ||
} | ||
|
||
// Temporality returns the Cumulative temporality for metrics aggregation. | ||
// Telemetry Gateway stores metrics in Prometheus format, so use Cummulative aggregation as default. | ||
func (e *OTELExporter) Temporality(_ metric.InstrumentKind) metricdata.Temporality { | ||
return metricdata.CumulativeTemporality | ||
} | ||
|
||
// Aggregation returns the Aggregation to use for an instrument kind. | ||
// The default implementation provided by the OTEL Metrics SDK library DefaultAggregationSelector panics. | ||
// This custom version replicates that logic, but removes the panic. | ||
func (e *OTELExporter) Aggregation(kind metric.InstrumentKind) aggregation.Aggregation { | ||
switch kind { | ||
case metric.InstrumentKindObservableGauge: | ||
return aggregation.LastValue{} | ||
case metric.InstrumentKindHistogram: | ||
return aggregation.ExplicitBucketHistogram{ | ||
Boundaries: []float64{0, 5, 10, 25, 50, 75, 100, 250, 500, 750, 1000, 2500, 5000, 7500, 10000}, | ||
NoMinMax: false, | ||
} | ||
} | ||
// for metric.InstrumentKindCounter and others, default to sum. | ||
return aggregation.Sum{} | ||
} | ||
|
||
// Export serializes and transmits metric data to a receiver. | ||
func (e *OTELExporter) Export(ctx context.Context, metrics *metricdata.ResourceMetrics) error { | ||
otlpMetrics := transformOTLP(metrics) | ||
if isEmpty(otlpMetrics) { | ||
return nil | ||
} | ||
return e.client.ExportMetrics(ctx, otlpMetrics, e.url.String()) | ||
} | ||
|
||
// ForceFlush is a no-op, as the MetricsClient client holds no state. | ||
func (e *OTELExporter) ForceFlush(ctx context.Context) error { | ||
// TODO: Emit metric when this operation occurs. | ||
return ctx.Err() | ||
} | ||
|
||
// Shutdown is a no-op, as the MetricsClient is a HTTP client that requires no graceful shutdown. | ||
func (e *OTELExporter) Shutdown(ctx context.Context) error { | ||
// TODO: Emit metric when this operation occurs. | ||
return ctx.Err() | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,140 @@ | ||
package telemetry | ||
|
||
import ( | ||
"context" | ||
"fmt" | ||
"testing" | ||
|
||
"github.com/stretchr/testify/require" | ||
"go.opentelemetry.io/otel/sdk/metric" | ||
"go.opentelemetry.io/otel/sdk/metric/aggregation" | ||
"go.opentelemetry.io/otel/sdk/metric/metricdata" | ||
"go.opentelemetry.io/otel/sdk/resource" | ||
metricpb "go.opentelemetry.io/proto/otlp/metrics/v1" | ||
|
||
"github.com/hashicorp/consul/agent/hcp/client" | ||
) | ||
|
||
func TestTemporality(t *testing.T) { | ||
t.Parallel() | ||
exp := &OTELExporter{} | ||
require.Equal(t, metricdata.CumulativeTemporality, exp.Temporality(metric.InstrumentKindCounter)) | ||
} | ||
|
||
func TestAggregation(t *testing.T) { | ||
t.Parallel() | ||
for name, test := range map[string]struct { | ||
kind metric.InstrumentKind | ||
expAgg aggregation.Aggregation | ||
}{ | ||
"gauge": { | ||
kind: metric.InstrumentKindObservableGauge, | ||
expAgg: aggregation.LastValue{}, | ||
}, | ||
"counter": { | ||
kind: metric.InstrumentKindCounter, | ||
expAgg: aggregation.Sum{}, | ||
}, | ||
"histogram": { | ||
kind: metric.InstrumentKindHistogram, | ||
expAgg: aggregation.ExplicitBucketHistogram{Boundaries: []float64{0, 5, 10, 25, 50, 75, 100, 250, 500, 750, 1000, 2500, 5000, 7500, 10000}, NoMinMax: false}, | ||
}, | ||
} { | ||
test := test | ||
t.Run(name, func(t *testing.T) { | ||
t.Parallel() | ||
exp := &OTELExporter{} | ||
require.Equal(t, test.expAgg, exp.Aggregation(test.kind)) | ||
}) | ||
} | ||
} | ||
|
||
type mockMetricsClient struct { | ||
exportErr error | ||
} | ||
|
||
func (m *mockMetricsClient) ExportMetrics(ctx context.Context, protoMetrics *metricpb.ResourceMetrics, endpoint string) error { | ||
return m.exportErr | ||
} | ||
|
||
func TestExport(t *testing.T) { | ||
t.Parallel() | ||
for name, test := range map[string]struct { | ||
wantErr string | ||
metrics *metricdata.ResourceMetrics | ||
client client.MetricsClient | ||
}{ | ||
"earlyReturnWithoutScopeMetrics": { | ||
client: &mockMetricsClient{}, | ||
metrics: mutateMetrics(nil), | ||
}, | ||
"earlyReturnWithoutMetrics": { | ||
client: &mockMetricsClient{}, | ||
metrics: mutateMetrics([]metricdata.ScopeMetrics{ | ||
{Metrics: []metricdata.Metrics{}}, | ||
}, | ||
), | ||
}, | ||
"errorWithExportFailure": { | ||
client: &mockMetricsClient{ | ||
exportErr: fmt.Errorf("failed to export metrics."), | ||
}, | ||
metrics: mutateMetrics([]metricdata.ScopeMetrics{ | ||
{ | ||
Metrics: []metricdata.Metrics{ | ||
{ | ||
Name: "consul.raft.commitTime", | ||
Data: metricdata.Gauge[float64]{}, | ||
}, | ||
}, | ||
}, | ||
}, | ||
), | ||
wantErr: "failed to export metrics", | ||
}, | ||
} { | ||
test := test | ||
t.Run(name, func(t *testing.T) { | ||
t.Parallel() | ||
exp := &OTELExporter{ | ||
client: test.client, | ||
} | ||
|
||
err := exp.Export(context.Background(), test.metrics) | ||
if test.wantErr != "" { | ||
require.Error(t, err) | ||
require.Contains(t, err.Error(), test.wantErr) | ||
return | ||
} | ||
|
||
require.NoError(t, err) | ||
}) | ||
} | ||
} | ||
|
||
func TestForceFlush(t *testing.T) { | ||
t.Parallel() | ||
exp := &OTELExporter{} | ||
ctx, cancel := context.WithCancel(context.Background()) | ||
cancel() | ||
|
||
err := exp.ForceFlush(ctx) | ||
require.ErrorIs(t, err, context.Canceled) | ||
} | ||
|
||
func TestShutdown(t *testing.T) { | ||
t.Parallel() | ||
exp := &OTELExporter{} | ||
ctx, cancel := context.WithCancel(context.Background()) | ||
cancel() | ||
|
||
err := exp.Shutdown(ctx) | ||
require.ErrorIs(t, err, context.Canceled) | ||
} | ||
|
||
func mutateMetrics(m []metricdata.ScopeMetrics) *metricdata.ResourceMetrics { | ||
return &metricdata.ResourceMetrics{ | ||
Resource: resource.Empty(), | ||
ScopeMetrics: m, | ||
} | ||
} |
Oops, something went wrong.