Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

slo - capture plugin duration for http based plugins #998

Merged
merged 25 commits into from
Jun 11, 2024
Merged
Show file tree
Hide file tree
Changes from 22 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
227 changes: 227 additions & 0 deletions experimental/slo/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,227 @@
package slo

import (
"context"
"fmt"
"strings"
"sync"
"time"

"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)

// Metrics is an abstraction for collecting metrics
type Metrics struct {
DSName string
DSType string
Endpoint Endpoint
}

// Duration is stored in the Context and used to collect metrics
type Duration struct {
Value float64
Status Status
Source Source
StatusCode int
mutex sync.Mutex
}

func (d *Duration) Add(value float64, source Source, statusCode int, err error) {
d.mutex.Lock()
defer d.mutex.Unlock()
if d.Status == "" {
d.Status = "ok"
}
if err != nil {
d.Status = "error"
}
if statusCode >= 400 {
d.Status = "error"
}

// If the status code is now ok, but the previous status code was 401 or 403, mark it as ok
// assuming a successful re-authentication ( token refresh, etc )
if statusCode < 400 && (d.StatusCode == 401 || d.StatusCode == 403) {
d.Status = "ok"
}

d.StatusCode = statusCode
d.Source = source
d.Value += value
}

// Status is the status of the request
type Status string

// Endpoint is the endpoint of the request (health, query, resource)
type Endpoint string

// Source is the source of the error (downstream, plugin)
type Source string

const (
StatusOK Status = "ok"
StatusError Status = "error"
EndpointHealth Endpoint = "health"
EndpointQuery Endpoint = "query"
EndpointResource Endpoint = "resource"
SourceDownstream Source = "downstream"
SourcePlugin Source = "plugin"
)

var durationMetric = promauto.NewHistogramVec(prometheus.HistogramOpts{
Namespace: "plugins",
Name: "plugin_request_duration_seconds",
Help: "Duration of plugin execution",
}, []string{"datasource_name", "datasource_type", "source", "endpoint", "status", "status_code"})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit. Wonder if status_code is really needed, leave that up to you. But in general, feels like logs could be more suitable for details following our discussion from last week.


// NewMetrics creates a new Metrics instance
func NewMetrics(dsName, dsType string) Collector {
dsName, ok := sanitizeLabelName(dsName)
if !ok {
backend.Logger.Warn("Failed to sanitize datasource name for prometheus label", dsName)
}
return Metrics{DSName: dsName, DSType: dsType}
}

// WithEndpoint returns a new Metrics instance with the given endpoint
func (m Metrics) WithEndpoint(endpoint Endpoint) Collector {
return Metrics{DSName: m.DSName, DSType: m.DSType, Endpoint: endpoint}
}

// CollectDuration collects the duration as a metric
func (m Metrics) CollectDuration(source Source, status Status, statusCode int, duration float64) {
durationMetric.WithLabelValues(m.DSName, m.DSType, string(source), string(m.Endpoint), string(status), fmt.Sprint(statusCode)).Observe(duration)
}

// SanitizeLabelName removes all invalid chars from the label name.
// If the label name is empty or contains only invalid chars, it will return false indicating it was not sanitized.
// copied from https://github.com/grafana/grafana/blob/main/pkg/infra/metrics/metricutil/utils.go#L14
func sanitizeLabelName(name string) (string, bool) {
if len(name) == 0 {
backend.Logger.Warn(fmt.Sprintf("label name cannot be empty: %s", name))
return "", false
}

out := strings.Builder{}
for i, b := range name {
if (b >= 'a' && b <= 'z') || (b >= 'A' && b <= 'Z') || b == '_' || (b >= '0' && b <= '9' && i > 0) {
out.WriteRune(b)
} else if b == ' ' {
out.WriteRune('_')
}
}

if out.Len() == 0 {
backend.Logger.Warn(fmt.Sprintf("label name only contains invalid chars: %q", name))
return "", false
}

return out.String(), true
}

// MetricsKey is a key for storing metrics in the context
type MetricsKey string

// DurationKey is a key for storing the duration in the context
const DurationKey MetricsKey = "downstream_duration"

// MetricsWrapper is a wrapper for a plugin that collects metrics
type MetricsWrapper struct {
Name string
Type string
healthcheckHandler backend.CheckHealthHandler
queryDataHandler backend.QueryDataHandler
resourceHandler backend.CallResourceHandler
Metrics Collector
}

// NewMetricsWrapper creates a new MetricsWrapper instance
func NewMetricsWrapper(plugin any, s backend.DataSourceInstanceSettings, c ...Collector) *MetricsWrapper {
collector := NewMetrics(s.Name, s.Type)
if len(c) > 0 {
collector = c[0]
}
wrapper := &MetricsWrapper{
Name: s.Name,
Type: s.Type,
Metrics: collector,
}
if h, ok := plugin.(backend.CheckHealthHandler); ok {
wrapper.healthcheckHandler = h
}
if q, ok := plugin.(backend.QueryDataHandler); ok {
wrapper.queryDataHandler = q
}
if r, ok := plugin.(backend.CallResourceHandler); ok {
wrapper.resourceHandler = r
}
return wrapper
}

// QueryData calls the QueryDataHandler and collects metrics
func (ds *MetricsWrapper) QueryData(ctx context.Context, req *backend.QueryDataRequest) (*backend.QueryDataResponse, error) {
ctx = context.WithValue(ctx, DurationKey, &Duration{Value: 0})
metrics := ds.Metrics.WithEndpoint(EndpointQuery)

start := time.Now()

defer func() {
collectDuration(ctx, start, metrics)
}()

return ds.queryDataHandler.QueryData(ctx, req)
}

// CheckHealth calls the CheckHealthHandler and collects metrics
func (ds *MetricsWrapper) CheckHealth(ctx context.Context, req *backend.CheckHealthRequest) (*backend.CheckHealthResult, error) {
ctx = context.WithValue(ctx, DurationKey, &Duration{Value: 0})
metrics := ds.Metrics.WithEndpoint(EndpointHealth)

start := time.Now()

defer func() {
collectDuration(ctx, start, metrics)
}()

return ds.healthcheckHandler.CheckHealth(ctx, req)
}

// CallResource calls the CallResourceHandler and collects metrics
func (ds *MetricsWrapper) CallResource(ctx context.Context, req *backend.CallResourceRequest, sender backend.CallResourceResponseSender) error {
ctx = context.WithValue(ctx, DurationKey, &Duration{Value: 0})
metrics := ds.Metrics.WithEndpoint(EndpointResource)

start := time.Now()

defer func() {
collectDuration(ctx, start, metrics)
}()

return ds.resourceHandler.CallResource(ctx, req, sender)
}

func collectDuration(ctx context.Context, start time.Time, metrics Collector) {
totalDuration := time.Since(start).Seconds()
downstreamDuration := ctx.Value(DurationKey)
if downstreamDuration != nil {
d := downstreamDuration.(*Duration)
pluginDuration := totalDuration - d.Value
metrics.CollectDuration(d.Source, d.Status, d.StatusCode, pluginDuration)
}
}

func SanitizeLabelName(name string) (string, error) {
s, ok := sanitizeLabelName(name)
if ok {
return s, nil
}
return "", fmt.Errorf("failed to sanitize label name %s", name)
}

type Collector interface {
CollectDuration(source Source, status Status, statusCode int, duration float64)
WithEndpoint(endpoint Endpoint) Collector
}
81 changes: 81 additions & 0 deletions experimental/slo/metrics_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package slo_test

import (
"context"
"testing"

"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana-plugin-sdk-go/experimental/slo"
"github.com/grafana/grafana-plugin-sdk-go/experimental/slo/test"
"github.com/stretchr/testify/assert"
)

func TestCheckHealthWithMetrics(t *testing.T) {
ds, err := test.NewDS()
assert.Equal(t, nil, err)
req := health()
collector := &test.Collector{}
wrapper := slo.NewMetricsWrapper(ds, *req.PluginContext.DataSourceInstanceSettings, collector)

res, err := wrapper.CheckHealth(context.Background(), req)

assert.Equal(t, nil, err)
assert.Equal(t, backend.HealthStatusOk, res.Status)
assert.True(t, collector.Duration > 0)
}

func TestQueryWithMetrics(t *testing.T) {
ds, err := test.NewDS()
assert.Equal(t, nil, err)
req := query()
collector := &test.Collector{}
wrapper := slo.NewMetricsWrapper(ds, *req.PluginContext.DataSourceInstanceSettings, collector)

_, err = wrapper.QueryData(context.Background(), req)

assert.Equal(t, nil, err)
assert.True(t, collector.Duration > 0)
}

func TestResourceWithMetrics(t *testing.T) {
ds, err := test.NewDS()
assert.Equal(t, nil, err)
req := resource()
collector := &test.Collector{}
wrapper := slo.NewMetricsWrapper(ds, *req.PluginContext.DataSourceInstanceSettings, collector)

err = wrapper.CallResource(context.Background(), req, nil)

assert.Equal(t, nil, err)
assert.True(t, collector.Duration > 0)
}

func health() *backend.CheckHealthRequest {
return &backend.CheckHealthRequest{
PluginContext: pluginCtx(),
}
}

func query() *backend.QueryDataRequest {
return &backend.QueryDataRequest{
PluginContext: pluginCtx(),
}
}

func resource() *backend.CallResourceRequest {
return &backend.CallResourceRequest{
PluginContext: pluginCtx(),
}
}

func pluginCtx() backend.PluginContext {
return backend.PluginContext{
DataSourceInstanceSettings: &backend.DataSourceInstanceSettings{
Name: "foo",
UID: "uid",
Type: "type",
JSONData: []byte(`{}`),
DecryptedSecureJSONData: map[string]string{},
},
}
}
Loading