Skip to content

Commit

Permalink
fix(otelgrpc): correctly assign grpc status code
Browse files Browse the repository at this point in the history
  • Loading branch information
liufuyang committed Oct 25, 2023
1 parent 5adc271 commit a58a6de
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 11 deletions.
21 changes: 13 additions & 8 deletions instrumentation/google.golang.org/grpc/otelgrpc/interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -366,31 +366,32 @@ func UnaryServerInterceptor(opts ...Option) grpc.UnaryServerInterceptor {
messageReceived.Event(ctx, 1, req)
}

var statusCode grpc_codes.Code
defer func(t time.Time) {
elapsedTime := time.Since(t) / time.Millisecond
attr = append(attr, semconv.RPCGRPCStatusCodeKey.Int64(int64(statusCode)))
o := metric.WithAttributes(attr...)
cfg.rpcServerDuration.Record(ctx, int64(elapsedTime), o)
}(time.Now())
before := time.Now()
var grpcStatusCode grpc_codes.Code

resp, err := handler(ctx, req)
if err != nil {
s, _ := status.FromError(err)
grpcStatusCode = s.Code()
statusCode, msg := serverStatus(s)
span.SetStatus(statusCode, msg)
span.SetAttributes(statusCodeAttr(s.Code()))
if cfg.SentEvent {
messageSent.Event(ctx, 1, s.Proto())
}
} else {
statusCode = grpc_codes.OK
grpcStatusCode = grpc_codes.OK
span.SetAttributes(statusCodeAttr(grpc_codes.OK))
if cfg.SentEvent {
messageSent.Event(ctx, 1, resp)
}
}

elapsedTime := time.Since(before).Milliseconds()
attr = append(attr, semconv.RPCGRPCStatusCodeKey.Int64(int64(grpcStatusCode)))
o := metric.WithAttributes(attr...)
cfg.rpcServerDuration.Record(ctx, elapsedTime, o)

return resp, err
}
}
Expand Down Expand Up @@ -580,3 +581,7 @@ func serverStatus(grpcStatus *status.Status) (codes.Code, string) {
return codes.Unset, ""
}
}

func ptr[T any](v T) *T {
return &v
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ package test
import (
"context"
"errors"
"go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
"io"
"net"
"strings"
Expand Down Expand Up @@ -52,6 +54,20 @@ func getSpanFromRecorder(sr *tracetest.SpanRecorder, name string) (trace.ReadOnl
return nil, false
}

func getMetricFromData(data metricdata.Histogram[int64], name string) (*metricdata.HistogramDataPoint[int64], bool) {
for _, d := range data.DataPoints {
v, ok := d.Attributes.Value(semconv.RPCMethodKey)
if !ok {
return nil, false
}
if semconv.RPCMethod(name).Value == v {
return &d, true
}

}
return nil, false
}

type mockUICInvoker struct {
ctx context.Context
}
Expand Down Expand Up @@ -867,24 +883,34 @@ func assertServerSpan(t *testing.T, wantSpanCode codes.Code, wantSpanStatusDescr
func TestUnaryServerInterceptor(t *testing.T) {
sr := tracetest.NewSpanRecorder()
tp := trace.NewTracerProvider(trace.WithSpanProcessor(sr))
mr := metric.NewManualReader()
mp := metric.NewMeterProvider(metric.WithReader(mr))
usi := otelgrpc.UnaryServerInterceptor(
otelgrpc.WithTracerProvider(tp),
otelgrpc.WithMeterProvider(mp),
)

for _, check := range serverChecks {
name := check.grpcCode.String()
t.Run(name, func(t *testing.T) {
serviceName := "TestGrpcService"
methodName := serviceName + "/" + name
fullMethodName := "/" + methodName
// call the unary interceptor
grpcErr := status.Error(check.grpcCode, check.grpcCode.String())
handler := func(_ context.Context, _ interface{}) (interface{}, error) {
return nil, grpcErr
}
_, err := usi(context.Background(), &grpc_testing.SimpleRequest{}, &grpc.UnaryServerInfo{FullMethod: name}, handler)
_, err := usi(context.Background(), &grpc_testing.SimpleRequest{}, &grpc.UnaryServerInfo{FullMethod: fullMethodName}, handler)
assert.Equal(t, grpcErr, err)

// validate span
span, ok := getSpanFromRecorder(sr, name)
require.True(t, ok, "missing span %s", name)
span, ok := getSpanFromRecorder(sr, methodName)
require.True(t, ok, "missing span %s", methodName)
assertServerSpan(t, check.wantSpanCode, check.wantSpanStatusDescription, check.grpcCode, span)

// validate metric
checkManualReaderRecords(t, mr, serviceName, name, check.grpcCode)
})
}
}
Expand Down Expand Up @@ -1069,3 +1095,24 @@ func TestStreamServerInterceptorEvents(t *testing.T) {
})
}
}

func checkManualReaderRecords(t *testing.T, reader metric.Reader, serviceName, name string, code grpc_codes.Code) {
rm := metricdata.ResourceMetrics{}
err := reader.Collect(context.Background(), &rm)
assert.NoError(t, err)
require.Len(t, rm.ScopeMetrics, 1)
require.Len(t, rm.ScopeMetrics[0].Metrics, 1)
require.IsType(t, rm.ScopeMetrics[0].Metrics[0].Data, metricdata.Histogram[int64]{})
data := rm.ScopeMetrics[0].Metrics[0].Data.(metricdata.Histogram[int64])

dpt, ok := getMetricFromData(data, name)
assert.True(t, ok)
attr := dpt.Attributes.ToSlice()
assert.ElementsMatch(t, []attribute.KeyValue{
semconv.RPCMethod(name),
semconv.RPCService(serviceName),
otelgrpc.RPCSystemGRPC,
otelgrpc.GRPCStatusCodeKey.Int64(int64(code)),
}, attr)

}

0 comments on commit a58a6de

Please sign in to comment.