Skip to content

Commit

Permalink
fix(otelgrpc): correctly assign grpc status code (#4481)
Browse files Browse the repository at this point in the history
* fix(otelgrpc): correctly assign grpc status code

* Update CHANGELOG.md

* Update CHANGELOG.md

* fix: minor fix for easier reading

* fix: minor code fix up

* fix: minor code fix up

---------

Co-authored-by: Robert Pająk <pellared@hotmail.com>
Co-authored-by: Tyler Yahn <MrAlias@users.noreply.github.com>
  • Loading branch information
3 people authored Oct 27, 2023
1 parent 3232d7e commit d86d6c8
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 14 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
### Fixed

- The `go.opentelemetry.io/contrib/samplers/jaegerremote` sampler does not panic when the default HTTP round-tripper (`http.DefaultTransport`) is not `*http.Transport`. (#4045)
- The `UnaryServerInterceptor` in `go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc` now sets gRPC status code correctly for the `rpc.server.duration` metric. (#4481)

## [1.20.0/0.45.0/0.14.0] - 2023-09-28

Expand Down
20 changes: 9 additions & 11 deletions instrumentation/google.golang.org/grpc/otelgrpc/interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -366,30 +366,28 @@ func UnaryServerInterceptor(opts ...Option) grpc.UnaryServerInterceptor {
messageReceived.Event(ctx, 1, req)
}

var statusCode grpc_codes.Code
defer func(t time.Time) {
elapsedTime := time.Since(t) / time.Millisecond
attr = append(attr, semconv.RPCGRPCStatusCodeKey.Int64(int64(statusCode)))
o := metric.WithAttributes(attr...)
cfg.rpcServerDuration.Record(ctx, int64(elapsedTime), o)
}(time.Now())
before := time.Now()

resp, err := handler(ctx, req)

s, _ := status.FromError(err)
if err != nil {
s, _ := status.FromError(err)
statusCode, msg := serverStatus(s)
span.SetStatus(statusCode, msg)
span.SetAttributes(statusCodeAttr(s.Code()))
if cfg.SentEvent {
messageSent.Event(ctx, 1, s.Proto())
}
} else {
statusCode = grpc_codes.OK
span.SetAttributes(statusCodeAttr(grpc_codes.OK))
if cfg.SentEvent {
messageSent.Event(ctx, 1, resp)
}
}
grpcStatusCodeAttr := statusCodeAttr(s.Code())
span.SetAttributes(grpcStatusCodeAttr)

elapsedTime := time.Since(before).Milliseconds()
attr = append(attr, grpcStatusCodeAttr)
cfg.rpcServerDuration.Record(ctx, elapsedTime, metric.WithAttributes(attr...))

return resp, err
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import (
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
"go.opentelemetry.io/otel/sdk/trace"
"go.opentelemetry.io/otel/sdk/trace/tracetest"
semconv "go.opentelemetry.io/otel/semconv/v1.17.0"
Expand All @@ -52,6 +54,19 @@ func getSpanFromRecorder(sr *tracetest.SpanRecorder, name string) (trace.ReadOnl
return nil, false
}

func getMetricFromData(data metricdata.Histogram[int64], name string) (*metricdata.HistogramDataPoint[int64], bool) {
for _, d := range data.DataPoints {
v, ok := d.Attributes.Value(semconv.RPCMethodKey)
if !ok {
return nil, false
}
if semconv.RPCMethod(name).Value == v {
return &d, true
}
}
return nil, false
}

type mockUICInvoker struct {
ctx context.Context
}
Expand Down Expand Up @@ -867,24 +882,34 @@ func assertServerSpan(t *testing.T, wantSpanCode codes.Code, wantSpanStatusDescr
func TestUnaryServerInterceptor(t *testing.T) {
sr := tracetest.NewSpanRecorder()
tp := trace.NewTracerProvider(trace.WithSpanProcessor(sr))
mr := metric.NewManualReader()
mp := metric.NewMeterProvider(metric.WithReader(mr))
usi := otelgrpc.UnaryServerInterceptor(
otelgrpc.WithTracerProvider(tp),
otelgrpc.WithMeterProvider(mp),
)

for _, check := range serverChecks {
name := check.grpcCode.String()
t.Run(name, func(t *testing.T) {
serviceName := "TestGrpcService"
methodName := serviceName + "/" + name
fullMethodName := "/" + methodName
// call the unary interceptor
grpcErr := status.Error(check.grpcCode, check.grpcCode.String())
handler := func(_ context.Context, _ interface{}) (interface{}, error) {
return nil, grpcErr
}
_, err := usi(context.Background(), &grpc_testing.SimpleRequest{}, &grpc.UnaryServerInfo{FullMethod: name}, handler)
_, err := usi(context.Background(), &grpc_testing.SimpleRequest{}, &grpc.UnaryServerInfo{FullMethod: fullMethodName}, handler)
assert.Equal(t, grpcErr, err)

// validate span
span, ok := getSpanFromRecorder(sr, name)
require.True(t, ok, "missing span %s", name)
span, ok := getSpanFromRecorder(sr, methodName)
require.True(t, ok, "missing span %s", methodName)
assertServerSpan(t, check.wantSpanCode, check.wantSpanStatusDescription, check.grpcCode, span)

// validate metric
checkManualReaderRecords(t, mr, serviceName, name, check.grpcCode)
})
}
}
Expand Down Expand Up @@ -1069,3 +1094,22 @@ func TestStreamServerInterceptorEvents(t *testing.T) {
})
}
}

func checkManualReaderRecords(t *testing.T, reader metric.Reader, serviceName, name string, code grpc_codes.Code) {
rm := metricdata.ResourceMetrics{}
err := reader.Collect(context.Background(), &rm)
assert.NoError(t, err)
require.Len(t, rm.ScopeMetrics, 1)
require.Len(t, rm.ScopeMetrics[0].Metrics, 1)
require.IsType(t, rm.ScopeMetrics[0].Metrics[0].Data, metricdata.Histogram[int64]{})
data := rm.ScopeMetrics[0].Metrics[0].Data.(metricdata.Histogram[int64])
dpt, ok := getMetricFromData(data, name)
assert.True(t, ok)
attr := dpt.Attributes.ToSlice()
assert.ElementsMatch(t, []attribute.KeyValue{
semconv.RPCMethod(name),
semconv.RPCService(serviceName),
otelgrpc.RPCSystemGRPC,
otelgrpc.GRPCStatusCodeKey.Int64(int64(code)),
}, attr)
}

0 comments on commit d86d6c8

Please sign in to comment.