Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

slo - capture plugin duration for http based plugins #998

Merged
merged 25 commits into from
Jun 11, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
184 changes: 184 additions & 0 deletions experimental/slo/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
package slo

import (
"context"
"fmt"
"strings"
"time"

"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)

// Metrics is an abstraction for collecting metrics
type Metrics struct {
DSName string
DSType string
Endpoint Endpoint
}

// Duration is stored in the Context and used to collect metrics
type Duration struct {
Value float64
Status Status
Source Source
StatusCode int
}

// Status is the status of the request
type Status string

// Endpoint is the endpoint of the request (health, query, resource)
type Endpoint string

// Source is the source of the error (downstream, plugin)
type Source string

const (
StatusOK Status = "ok"
StatusError Status = "error"
EndpointHealth Endpoint = "health"
EndpointQuery Endpoint = "query"
EndpointResource Endpoint = "resource"
SourceDownstream Source = "downstream"
SourcePlugin Source = "plugin"
)

var durationMetric = promauto.NewHistogramVec(prometheus.HistogramOpts{
Namespace: "plugins",
Name: "plugin_request_duration_seconds",
Help: "Duration of plugin execution",
}, []string{"datasource_name", "datasource_type", "source", "endpoint", "status", "status_code"})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit. Wonder if status_code is really needed, leave that up to you. But in general, feels like logs could be more suitable for details following our discussion from last week.


// NewMetrics creates a new Metrics instance
func NewMetrics(dsName, dsType string) Metrics {
dsName, ok := sanitizeLabelName(dsName)
if !ok {
backend.Logger.Warn("Failed to sanitize datasource name for prometheus label", dsName)
}
return Metrics{DSName: dsName, DSType: dsType}
}

// WithEndpoint returns a new Metrics instance with the given endpoint
func (m *Metrics) WithEndpoint(endpoint Endpoint) Metrics {
return Metrics{DSName: m.DSName, DSType: m.DSType, Endpoint: endpoint}
}

// CollectDuration collects the duration as a metric
func (m *Metrics) CollectDuration(source Source, status Status, statusCode int, duration float64) {
durationMetric.WithLabelValues(m.DSName, m.DSType, string(source), string(m.Endpoint), string(status), fmt.Sprint(statusCode)).Observe(duration)
}

// sanitizeLabelName removes all invalid chars from the label name.
// If the label name is empty or contains only invalid chars, it will return false indicating it was not sanitized.
// copied from https://github.com/grafana/grafana/blob/main/pkg/infra/metrics/metricutil/utils.go#L14
func sanitizeLabelName(name string) (string, bool) {
if len(name) == 0 {
backend.Logger.Warn(fmt.Sprintf("label name cannot be empty: %s", name))
return "", false
}

out := strings.Builder{}
for i, b := range name {
if (b >= 'a' && b <= 'z') || (b >= 'A' && b <= 'Z') || b == '_' || (b >= '0' && b <= '9' && i > 0) {
out.WriteRune(b)
} else if b == ' ' {
out.WriteRune('_')
}
}

if out.Len() == 0 {
backend.Logger.Warn(fmt.Sprintf("label name only contains invalid chars: %q", name))
return "", false
}

return out.String(), true
}

// MetricsKey is a key for storing metrics in the context
type MetricsKey string

// DurationKey is a key for storing the duration in the context
const DurationKey MetricsKey = "downstream_duration"

// MetricsWrapper is a wrapper for a plugin that collects metrics
type MetricsWrapper struct {
Name string
ID string
healthcheckHandler backend.CheckHealthHandler
queryDataHandler backend.QueryDataHandler
resourceHandler backend.CallResourceHandler
Metrics Metrics
}

// NewMetricsWrapper creates a new MetricsWrapper instance
func NewMetricsWrapper(plugin any, s backend.DataSourceInstanceSettings) *MetricsWrapper {
wrapper := &MetricsWrapper{
Name: s.Name,
ID: s.UID,
Metrics: NewMetrics(s.Name, s.UID),
}
if h, ok := plugin.(backend.CheckHealthHandler); ok {
wrapper.healthcheckHandler = h
}
if q, ok := plugin.(backend.QueryDataHandler); ok {
wrapper.queryDataHandler = q
}
if r, ok := plugin.(backend.CallResourceHandler); ok {
wrapper.resourceHandler = r
}
return wrapper
}

// QueryData calls the QueryDataHandler and collects metrics
func (ds *MetricsWrapper) QueryData(ctx context.Context, req *backend.QueryDataRequest) (*backend.QueryDataResponse, error) {
ctx = context.WithValue(ctx, DurationKey, &Duration{Value: 0})
metrics := ds.Metrics.WithEndpoint(EndpointQuery)

start := time.Now()

defer func() {
collectDuration(ctx, start, metrics)
}()

return ds.queryDataHandler.QueryData(ctx, req)
}

// CheckHealth calls the CheckHealthHandler and collects metrics
func (ds *MetricsWrapper) CheckHealth(ctx context.Context, req *backend.CheckHealthRequest) (*backend.CheckHealthResult, error) {
ctx = context.WithValue(ctx, DurationKey, &Duration{Value: 0})
metrics := ds.Metrics.WithEndpoint(EndpointHealth)

start := time.Now()

defer func() {
collectDuration(ctx, start, metrics)
}()

return ds.healthcheckHandler.CheckHealth(ctx, req)
}

// CallResource calls the CallResourceHandler and collects metrics
func (ds *MetricsWrapper) CallResource(ctx context.Context, req *backend.CallResourceRequest, sender backend.CallResourceResponseSender) error {
ctx = context.WithValue(ctx, DurationKey, &Duration{Value: 0})
metrics := ds.Metrics.WithEndpoint(EndpointResource)

start := time.Now()

defer func() {
collectDuration(ctx, start, metrics)
}()

return ds.resourceHandler.CallResource(ctx, req, sender)
}

func collectDuration(ctx context.Context, start time.Time, metrics Metrics) {
totalDuration := time.Since(start).Seconds()
downstreamDuration := ctx.Value(DurationKey)
if downstreamDuration != nil {
d := downstreamDuration.(*Duration)
pluginDuration := totalDuration - d.Value
metrics.CollectDuration(d.Source, d.Status, d.StatusCode, pluginDuration)
}
}
134 changes: 61 additions & 73 deletions experimental/slo/slo_middleware.go
Original file line number Diff line number Diff line change
@@ -1,102 +1,90 @@
package slo

import (
"errors"
"fmt"
"context"
"net/http"
"strings"
"syscall"
"time"

"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana-plugin-sdk-go/backend/httpclient"
"github.com/grafana/grafana-plugin-sdk-go/backend/log"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)

var Logger = log.DefaultLogger
// DurationMiddlewareName is the middleware name used by DurationMiddleware.
const DurationMiddlewareName = "Duration"

var duration = promauto.NewHistogramVec(prometheus.HistogramOpts{
Namespace: "plugins",
Name: "plugin_external_requests_duration_seconds",
Help: "Duration of requests to external services",
}, []string{"datasource_name", "datasource_type", "error_source"})

const DataSourceSLOMiddlewareName = "slo"

// Middleware captures duration of requests to external services and the source of errors
func Middleware() httpclient.Middleware {
return httpclient.NamedMiddlewareFunc(DataSourceSLOMiddlewareName, RoundTripper)
// DurationMiddleware applies the duration to the context.
func DurationMiddleware() httpclient.Middleware {
return httpclient.NamedMiddlewareFunc(DurationMiddlewareName, func(opts httpclient.Options, next http.RoundTripper) http.RoundTripper {
return DurationRoundTripper(opts, next)
})
}

// RoundTripper captures duration of requests to external services and the source of errors
func RoundTripper(opts httpclient.Options, next http.RoundTripper) http.RoundTripper {
name, kind, err := getDSInfo(opts)
func AddDurationMiddleware(ctx context.Context, s *backend.DataSourceInstanceSettings) (httpclient.Options, error) {
opts, err := s.HTTPClientOptions(ctx)
if err != nil {
Logger.Error("failed to get datasource info", "error", err)
return next
return opts, err
}
opts.Middlewares = append(opts.Middlewares, DurationMiddleware())
return opts, nil
}

// DurationRoundTripper captures the duration of the request in the context
func DurationRoundTripper(_ httpclient.Options, next http.RoundTripper) http.RoundTripper {
return httpclient.RoundTripperFunc(func(req *http.Request) (*http.Response, error) {
var duration *Duration
var httpErr error
var source = SourceDownstream
var statusCode int

start := time.Now()
var errorSource = "none"

ctx := req.Context()
val := ctx.Value(DurationKey)
if val == nil {
// TODO: this doesn't seem to change the context upstream
// so we always have to add the value to the context in the QueryData method
duration = &Duration{Value: 0}
ctx = context.WithValue(ctx, DurationKey, duration)
req = req.WithContext(ctx)
} else {
duration = val.(*Duration)
}

defer func() {
duration.WithLabelValues(name, kind, errorSource).Observe(time.Since(start).Seconds())
duration.Value += time.Since(start).Seconds()
if duration.Status == "" {
duration.Status = "ok"
}
if httpErr != nil {
duration.Status = "error"
}
if statusCode >= 400 {
duration.Status = "error"
}

// If the status code is now ok, but the previous status code was 401 or 403, mark it as ok
// assuming a successful re-authentication ( token refresh, etc )
if statusCode < 400 && (duration.StatusCode == 401 || duration.StatusCode == 403) {
duration.Status = "ok"
}

duration.StatusCode = statusCode
duration.Source = source
}()

res, err := next.RoundTrip(req)
if res != nil && res.StatusCode >= 400 {
errorSource = string(backend.ErrorSourceFromHTTPStatus(res.StatusCode))
if err != nil {
httpErr = err
}
if errors.Is(err, syscall.ECONNREFUSED) {
errorSource = string(backend.ErrorSourceDownstream)
if res != nil {
statusCode = res.StatusCode
source = Source(FromStatus(backend.Status(res.StatusCode)))
}
return res, err
})
}

func getDSInfo(opts httpclient.Options) (string, string, error) {
datasourceName, exists := opts.Labels["datasource_name"]
if !exists {
return "", "", errors.New("datasource_name label not found")
}

datasourceName, err := SanitizeLabelName(datasourceName)
// if the datasource named cannot be turned into a prometheus
// label we will skip instrumenting these metrics.
if err != nil {
return "", "", err
}

datasourceType, exists := opts.Labels["datasource_type"]
if !exists {
return "", "", errors.New("datasource_type label not found")
}

return datasourceName, datasourceType, nil
}

// SanitizeLabelName removes all invalid chars from the label name.
// If the label name is empty or contains only invalid chars, it
// will return an error.
func SanitizeLabelName(name string) (string, error) {
if len(name) == 0 {
return "", errors.New("label name cannot be empty")
}

out := strings.Builder{}
for i, b := range name {
if (b >= 'a' && b <= 'z') || (b >= 'A' && b <= 'Z') || b == '_' || (b >= '0' && b <= '9' && i > 0) {
out.WriteRune(b)
} else if b == ' ' {
out.WriteRune('_')
}
}

if out.Len() == 0 {
return "", fmt.Errorf("label name only contains invalid chars: %q", name)
}

return out.String(), nil
// FromStatus returns the error source from backend status
func FromStatus(status backend.Status) backend.ErrorSource {
return backend.ErrorSourceFromHTTPStatus(int(status))
}
Loading