Skip to content

Commit

Permalink
otelgrpc: add custom attributes to the stats handler
Browse files Browse the repository at this point in the history
Fixes #3894
  • Loading branch information
inigohu committed Jun 3, 2024
1 parent 48ab4e2 commit 99ea5ca
Show file tree
Hide file tree
Showing 2 changed files with 130 additions and 80 deletions.
77 changes: 77 additions & 0 deletions instrumentation/google.golang.org/grpc/otelgrpc/grpccontext.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package otelgrpc // import "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"

import (
"context"

"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
)

// metricsInfo contains metrics information for an RPC.
type metricsInfo struct {
msgReceived int64
msgSent int64
}

// traceInfo contains tracing information for an RPC.
type traceInfo struct {
name string
kind trace.SpanKind
}

// gRPCContext contains all the information needed to record metrics and traces.
type gRPCContext struct {
metricsInfo *metricsInfo
traceInfo *traceInfo
attrs []attribute.KeyValue
record bool
}

// AddAttrs adds attributes to the given context.
func AddAttrs(ctx context.Context, attrs ...attribute.KeyValue) context.Context {
gctx, _ := gRPCContextFromContext(ctx)
gctx.addAttrs(attrs...)
return contextWithGRPCContext(ctx, gctx)
}

// add attributes to a gRPCContext.
func (g *gRPCContext) addAttrs(attrs ...attribute.KeyValue) {
g.attrs = append(g.attrs, attrs...)
}

type gRPCContextKey struct{}

// contextWithGRPCContext returns a new context with the provided gRPCContext attached.
func contextWithGRPCContext(ctx context.Context, gctx *gRPCContext) context.Context {
return context.WithValue(ctx, gRPCContextKey{}, gctx)
}

// gRPCContextFromContext retrieves a GRPCContext instance from the provided context if
// one is available. If no GRPCContext was found in the provided context a new, empty
// GRPCContext is returned and the second return value is false. In this case it is
// safe to use the GRPCContext but any attributes added to it will not be used.
func gRPCContextFromContext(ctx context.Context) (*gRPCContext, bool) { // nolint: revive
l, ok := ctx.Value(gRPCContextKey{}).(*gRPCContext)
if !ok {
l = &gRPCContext{
metricsInfo: &metricsInfo{},
traceInfo: &traceInfo{},
record: true,
}
}
return l, ok
}
133 changes: 53 additions & 80 deletions instrumentation/google.golang.org/grpc/otelgrpc/stats_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,6 @@ import (
"go.opentelemetry.io/otel/trace"
)

type gRPCContextKey struct{}

type gRPCContext struct {
messagesReceived int64
messagesSent int64
metricAttrs []attribute.KeyValue
record bool
}

type serverHandler struct {
*config
}
Expand All @@ -54,31 +45,17 @@ func (h *serverHandler) HandleConn(ctx context.Context, info stats.ConnStats) {

// TagRPC can attach some information to the given context.
func (h *serverHandler) TagRPC(ctx context.Context, info *stats.RPCTagInfo) context.Context {
ctx = extract(ctx, h.config.Propagators)
ctx = extract(ctx, h.Propagators)

name, attrs := internal.ParseFullMethod(info.FullMethodName)
attrs = append(attrs, RPCSystemGRPC)
ctx, _ = h.tracer.Start(
trace.ContextWithRemoteSpanContext(ctx, trace.SpanContextFromContext(ctx)),
name,
trace.WithSpanKind(trace.SpanKindServer),
trace.WithAttributes(attrs...),
)
gctx, _ := gRPCContextFromContext(ctx)
gctx.traceInfo.kind = trace.SpanKindServer

gctx := gRPCContext{
metricAttrs: attrs,
record: true,
}
if h.config.Filter != nil {
gctx.record = h.config.Filter(info)
}
return context.WithValue(ctx, gRPCContextKey{}, &gctx)
return h.tagRPC(ctx, info)
}

// HandleRPC processes the RPC stats.
func (h *serverHandler) HandleRPC(ctx context.Context, rs stats.RPCStats) {
isServer := true
h.handleRPC(ctx, rs, isServer)
h.handleRPC(ctx, rs)
}

type clientHandler struct {
Expand All @@ -96,30 +73,17 @@ func NewClientHandler(opts ...Option) stats.Handler {

// TagRPC can attach some information to the given context.
func (h *clientHandler) TagRPC(ctx context.Context, info *stats.RPCTagInfo) context.Context {
name, attrs := internal.ParseFullMethod(info.FullMethodName)
attrs = append(attrs, RPCSystemGRPC)
ctx, _ = h.tracer.Start(
ctx,
name,
trace.WithSpanKind(trace.SpanKindClient),
trace.WithAttributes(attrs...),
)
gctx, _ := gRPCContextFromContext(ctx)
gctx.traceInfo.kind = trace.SpanKindClient

gctx := gRPCContext{
metricAttrs: attrs,
record: true,
}
if h.config.Filter != nil {
gctx.record = h.config.Filter(info)
}
ctx = h.tagRPC(contextWithGRPCContext(ctx, gctx), info)

return inject(context.WithValue(ctx, gRPCContextKey{}, &gctx), h.config.Propagators)
return inject(ctx, h.Propagators)
}

// HandleRPC processes the RPC stats.
func (h *clientHandler) HandleRPC(ctx context.Context, rs stats.RPCStats) {
isServer := false
h.handleRPC(ctx, rs, isServer)
h.handleRPC(ctx, rs)
}

// TagConn can attach some information to the given context.
Expand All @@ -132,28 +96,47 @@ func (h *clientHandler) HandleConn(context.Context, stats.ConnStats) {
// no-op
}

func (c *config) handleRPC(ctx context.Context, rs stats.RPCStats, isServer bool) { // nolint: revive // isServer is not a control flag.
func (c *config) tagRPC(ctx context.Context, info *stats.RPCTagInfo) context.Context {
name, attrs := internal.ParseFullMethod(info.FullMethodName)
attrs = append(attrs, RPCSystemGRPC)

gctx, _ := gRPCContextFromContext(ctx)
gctx.traceInfo.name = name
gctx.addAttrs(attrs...)

if c.Filter != nil {
gctx.record = c.Filter(info)
}

if gctx.traceInfo.kind == trace.SpanKindServer {
ctx = trace.ContextWithRemoteSpanContext(ctx, trace.SpanContextFromContext(ctx))
}

ctx, _ = c.tracer.Start(
ctx,
gctx.traceInfo.name,
trace.WithSpanKind(gctx.traceInfo.kind),
trace.WithAttributes(gctx.attrs...),
)

return contextWithGRPCContext(ctx, gctx)
}

func (c *config) handleRPC(ctx context.Context, rs stats.RPCStats) {
gctx, _ := gRPCContextFromContext(ctx)
span := trace.SpanFromContext(ctx)
var metricAttrs []attribute.KeyValue

var messageId int64

gctx, _ := ctx.Value(gRPCContextKey{}).(*gRPCContext)
if gctx != nil {
if !gctx.record {
return
}
metricAttrs = make([]attribute.KeyValue, 0, len(gctx.metricAttrs)+1)
metricAttrs = append(metricAttrs, gctx.metricAttrs...)
if !gctx.record {
return
}

switch rs := rs.(type) {
case *stats.Begin:
case *stats.InPayload:
if gctx != nil {
messageId = atomic.AddInt64(&gctx.messagesReceived, 1)
c.rpcRequestSize.Record(ctx, int64(rs.Length), metric.WithAttributeSet(attribute.NewSet(metricAttrs...)))
}

messageId = atomic.AddInt64(&gctx.metricsInfo.msgReceived, 1)
c.rpcRequestSize.Record(ctx, int64(rs.Length), metric.WithAttributeSet(attribute.NewSet(gctx.attrs...)))
if c.ReceivedEvent {
span.AddEvent("message",
trace.WithAttributes(
Expand All @@ -165,11 +148,8 @@ func (c *config) handleRPC(ctx context.Context, rs stats.RPCStats, isServer bool
)
}
case *stats.OutPayload:
if gctx != nil {
messageId = atomic.AddInt64(&gctx.messagesSent, 1)
c.rpcResponseSize.Record(ctx, int64(rs.Length), metric.WithAttributeSet(attribute.NewSet(metricAttrs...)))
}

messageId = atomic.AddInt64(&gctx.metricsInfo.msgSent, 1)
c.rpcResponseSize.Record(ctx, int64(rs.Length), metric.WithAttributeSet(attribute.NewSet(gctx.attrs...)))
if c.SentEvent {
span.AddEvent("message",
trace.WithAttributes(
Expand All @@ -186,36 +166,29 @@ func (c *config) handleRPC(ctx context.Context, rs stats.RPCStats, isServer bool
span.SetAttributes(peerAttr(p.Addr.String())...)
}
case *stats.End:
var rpcStatusAttr attribute.KeyValue

if rs.Error != nil {
s, _ := status.FromError(rs.Error)
if isServer {
if gctx.traceInfo.kind == trace.SpanKindServer {
statusCode, msg := serverStatus(s)
span.SetStatus(statusCode, msg)
} else {
span.SetStatus(codes.Error, s.Message())
}
rpcStatusAttr = semconv.RPCGRPCStatusCodeKey.Int(int(s.Code()))
gctx.addAttrs(semconv.RPCGRPCStatusCodeKey.Int(int(s.Code())))
} else {
rpcStatusAttr = semconv.RPCGRPCStatusCodeKey.Int(int(grpc_codes.OK))
gctx.addAttrs(semconv.RPCGRPCStatusCodeKey.Int(int(grpc_codes.OK)))
}
span.SetAttributes(rpcStatusAttr)
span.End()

metricAttrs = append(metricAttrs, rpcStatusAttr)
// Allocate vararg slice once.
recordOpts := []metric.RecordOption{metric.WithAttributeSet(attribute.NewSet(metricAttrs...))}
span.SetAttributes(gctx.attrs...)
span.End()

// Use floating point division here for higher precision (instead of Millisecond method).
// Measure right before calling Record() to capture as much elapsed time as possible.
elapsedTime := float64(rs.EndTime.Sub(rs.BeginTime)) / float64(time.Millisecond)

c.rpcDuration.Record(ctx, elapsedTime, recordOpts...)
if gctx != nil {
c.rpcRequestsPerRPC.Record(ctx, atomic.LoadInt64(&gctx.messagesReceived), recordOpts...)
c.rpcResponsesPerRPC.Record(ctx, atomic.LoadInt64(&gctx.messagesSent), recordOpts...)
}
c.rpcDuration.Record(ctx, elapsedTime, metric.WithAttributeSet(attribute.NewSet(gctx.attrs...)))
c.rpcRequestsPerRPC.Record(ctx, atomic.LoadInt64(&gctx.metricsInfo.msgReceived), metric.WithAttributeSet(attribute.NewSet(gctx.attrs...)))
c.rpcResponsesPerRPC.Record(ctx, atomic.LoadInt64(&gctx.metricsInfo.msgSent), metric.WithAttributeSet(attribute.NewSet(gctx.attrs...)))
default:
return
}
Expand Down

0 comments on commit 99ea5ca

Please sign in to comment.