Skip to content

Commit

Permalink
otel: update usage of otelgrpc interceptors to use stat handlers
Browse files Browse the repository at this point in the history
The otelgrpc interceptors were deprecated. This updates the areas where
these were used to use the stat handlers instead of the interceptors.
This helps with creating a single method for both unary and stream rpcs
and also ensures we aren't using a deprecated function for the future.

Signed-off-by: Jonathan A. Sternberg <jonathan.sternberg@docker.com>
  • Loading branch information
jsternberg committed Mar 28, 2024
1 parent 5e0fe27 commit b0183dd
Show file tree
Hide file tree
Showing 5 changed files with 104 additions and 116 deletions.
33 changes: 14 additions & 19 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/moby/buildkit/session/grpchijack"
"github.com/moby/buildkit/util/appdefaults"
"github.com/moby/buildkit/util/grpcerrors"
"github.com/moby/buildkit/util/grpcutil"
"github.com/moby/buildkit/util/tracing/otlptracegrpc"
"github.com/pkg/errors"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
Expand All @@ -29,6 +30,7 @@ import (
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/stats"
)

type Client struct {
Expand All @@ -48,9 +50,6 @@ func New(ctx context.Context, address string, opts ...ClientOpt) (*Client, error
}
needDialer := true

var unary []grpc.UnaryClientInterceptor
var stream []grpc.StreamClientInterceptor

var customTracer bool // allows manually setting disabling tracing even if tracer in context
var tracerProvider trace.TracerProvider
var tracerDelegate TracerDelegate
Expand Down Expand Up @@ -101,9 +100,14 @@ func New(ctx context.Context, address string, opts ...ClientOpt) (*Client, error
}

if tracerProvider != nil {
propagators := propagation.NewCompositeTextMapPropagator(propagation.TraceContext{}, propagation.Baggage{})
unary = append(unary, filterInterceptor(otelgrpc.UnaryClientInterceptor(otelgrpc.WithTracerProvider(tracerProvider), otelgrpc.WithPropagators(propagators)))) //nolint:staticcheck // TODO(thaJeztah): ignore SA1019 for deprecated options: see https://github.com/moby/buildkit/issues/4681
stream = append(stream, otelgrpc.StreamClientInterceptor(otelgrpc.WithTracerProvider(tracerProvider), otelgrpc.WithPropagators(propagators))) //nolint:staticcheck // TODO(thaJeztah): ignore SA1019 for deprecated options: see https://github.com/moby/buildkit/issues/4681
gopts = append(gopts, grpc.WithStatsHandler(
grpcutil.StatsFilter(otelgrpc.NewClientHandler(
otelgrpc.WithTracerProvider(tracerProvider),
otelgrpc.WithPropagators(
propagation.NewCompositeTextMapPropagator(propagation.TraceContext{}, propagation.Baggage{}),
),
), traceServiceExportFilter),
))
}

if needDialer {
Expand Down Expand Up @@ -145,12 +149,8 @@ func New(ctx context.Context, address string, opts ...ClientOpt) (*Client, error
}

gopts = append(gopts, grpc.WithAuthority(authority))

unary = append(unary, grpcerrors.UnaryClientInterceptor)
stream = append(stream, grpcerrors.StreamClientInterceptor)

gopts = append(gopts, grpc.WithChainUnaryInterceptor(unary...))
gopts = append(gopts, grpc.WithChainStreamInterceptor(stream...))
gopts = append(gopts, grpc.WithUnaryInterceptor(grpcerrors.UnaryClientInterceptor))
gopts = append(gopts, grpc.WithStreamInterceptor(grpcerrors.StreamClientInterceptor))
gopts = append(gopts, customDialOptions...)

conn, err := grpc.DialContext(ctx, address, gopts...)
Expand Down Expand Up @@ -386,13 +386,8 @@ func resolveDialer(address string) (func(context.Context, string) (net.Conn, err
return nil, nil
}

func filterInterceptor(intercept grpc.UnaryClientInterceptor) grpc.UnaryClientInterceptor {
return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
if strings.HasSuffix(method, "opentelemetry.proto.collector.trace.v1.TraceService/Export") {
return invoker(ctx, method, req, reply, cc, opts...)
}
return intercept(ctx, method, req, reply, cc, invoker, opts...)
}
func traceServiceExportFilter(info *stats.RPCTagInfo) bool {
return strings.HasSuffix(info.FullMethodName, "opentelemetry.proto.collector.trace.v1.TraceService/Export")
}

type withGRPCDialOption struct {
Expand Down
59 changes: 22 additions & 37 deletions cmd/buildkitd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
sddaemon "github.com/coreos/go-systemd/v22/daemon"
"github.com/docker/docker/pkg/reexec"
"github.com/gofrs/flock"
grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
"github.com/moby/buildkit/cache/remotecache"
"github.com/moby/buildkit/cache/remotecache/azblob"
"github.com/moby/buildkit/cache/remotecache/gha"
Expand All @@ -47,6 +46,7 @@ import (
"github.com/moby/buildkit/util/archutil"
"github.com/moby/buildkit/util/bklog"
"github.com/moby/buildkit/util/grpcerrors"
"github.com/moby/buildkit/util/grpcutil"
"github.com/moby/buildkit/util/profiler"
"github.com/moby/buildkit/util/resolver"
"github.com/moby/buildkit/util/stack"
Expand All @@ -62,16 +62,15 @@ import (
"github.com/urfave/cli"
"go.etcd.io/bbolt"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/propagation"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
"go.opentelemetry.io/otel/trace"
tracev1 "go.opentelemetry.io/proto/otlp/collector/trace/v1"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"
"google.golang.org/grpc/health"
healthv1 "google.golang.org/grpc/health/grpc_health_v1"
"google.golang.org/grpc/reflection"
"google.golang.org/grpc/stats"
)

func init() {
Expand Down Expand Up @@ -270,16 +269,17 @@ func main() {
return err
}

streamTracer := otelgrpc.StreamServerInterceptor( //nolint:staticcheck // TODO(thaJeztah): ignore SA1019 for deprecated options: see https://github.com/moby/buildkit/issues/4681
statsHandler := grpcutil.StatsFilter(otelgrpc.NewServerHandler(
otelgrpc.WithTracerProvider(tp),
otelgrpc.WithMeterProvider(mp),
otelgrpc.WithPropagators(propagators),
)
), traceServiceExportFilter)

unary := grpc_middleware.ChainUnaryServer(unaryInterceptor(ctx, tp, mp), grpcerrors.UnaryServerInterceptor)
stream := grpc_middleware.ChainStreamServer(streamTracer, grpcerrors.StreamServerInterceptor)

opts := []grpc.ServerOption{grpc.UnaryInterceptor(unary), grpc.StreamInterceptor(stream)}
opts := []grpc.ServerOption{
grpc.StatsHandler(statsHandler),
grpc.ChainUnaryInterceptor(unaryInterceptor, grpcerrors.UnaryServerInterceptor),
grpc.StreamInterceptor(grpcerrors.StreamServerInterceptor),
}
server := grpc.NewServer(opts...)

// relative path does not work with nightlyone/lockfile
Expand Down Expand Up @@ -660,38 +660,23 @@ func getListener(addr string, uid, gid int, tlsConfig *tls.Config) (net.Listener
}
}

func unaryInterceptor(globalCtx context.Context, tp trace.TracerProvider, mp metric.MeterProvider) grpc.UnaryServerInterceptor {
withTrace := otelgrpc.UnaryServerInterceptor( //nolint:staticcheck // TODO(thaJeztah): ignore SA1019 for deprecated options: see https://github.com/moby/buildkit/issues/4681
otelgrpc.WithTracerProvider(tp),
otelgrpc.WithMeterProvider(mp),
otelgrpc.WithPropagators(propagators),
)

return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
ctx, cancel := context.WithCancelCause(ctx)
defer cancel(errors.WithStack(context.Canceled))

go func() {
select {
case <-ctx.Done():
case <-globalCtx.Done():
cancel(context.Cause(globalCtx))
}
}()
func traceServiceExportFilter(info *stats.RPCTagInfo) bool {
return strings.HasSuffix(info.FullMethodName, "opentelemetry.proto.collector.trace.v1.TraceService/Export")
}

if strings.HasSuffix(info.FullMethod, "opentelemetry.proto.collector.trace.v1.TraceService/Export") {
return handler(ctx, req)
}
func unaryInterceptor(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp any, err error) {
if strings.HasSuffix(info.FullMethod, "opentelemetry.proto.collector.trace.v1.TraceService/Export") {
return handler(ctx, req)
}

resp, err = withTrace(ctx, req, info, handler)
if err != nil {
bklog.G(ctx).Errorf("%s returned error: %v", info.FullMethod, err)
if logrus.GetLevel() >= logrus.DebugLevel {
fmt.Fprintf(os.Stderr, "%+v", stack.Formatter(grpcerrors.FromGRPC(err)))
}
resp, err = handler(ctx, req)
if err != nil {
bklog.G(ctx).Errorf("%s returned error: %v", info.FullMethod, err)
if logrus.GetLevel() >= logrus.DebugLevel {
fmt.Fprintf(os.Stderr, "%+v", stack.Formatter(grpcerrors.FromGRPC(err)))
}
return
}
return resp, err
}

func serverCredentials(cfg config.TLSConfig) (*tls.Config, error) {
Expand Down
29 changes: 8 additions & 21 deletions session/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@ import (
"time"

"github.com/containerd/containerd/defaults"
grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
"github.com/moby/buildkit/util/bklog"
"github.com/moby/buildkit/util/grpcerrors"
"github.com/moby/buildkit/util/grpcutil"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
Expand All @@ -31,9 +31,6 @@ func serve(ctx context.Context, grpcServer *grpc.Server, conn net.Conn) {
}

func grpcClientConn(ctx context.Context, conn net.Conn) (context.Context, *grpc.ClientConn, error) {
var unary []grpc.UnaryClientInterceptor
var stream []grpc.StreamClientInterceptor

var dialCount int64
dialer := grpc.WithContextDialer(func(ctx context.Context, addr string) (net.Conn, error) {
if c := atomic.AddInt64(&dialCount, 1); c > 1 {
Expand All @@ -47,26 +44,16 @@ func grpcClientConn(ctx context.Context, conn net.Conn) (context.Context, *grpc.
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(defaults.DefaultMaxRecvMsgSize)),
grpc.WithDefaultCallOptions(grpc.MaxCallSendMsgSize(defaults.DefaultMaxSendMsgSize)),
grpc.WithUnaryInterceptor(grpcerrors.UnaryClientInterceptor),
grpc.WithStreamInterceptor(grpcerrors.StreamClientInterceptor),
}

if span := trace.SpanFromContext(ctx); span.SpanContext().IsValid() {
unary = append(unary, filterClient(otelgrpc.UnaryClientInterceptor(otelgrpc.WithTracerProvider(span.TracerProvider()), otelgrpc.WithPropagators(propagators)))) //nolint:staticcheck // TODO(thaJeztah): ignore SA1019 for deprecated options: see https://github.com/moby/buildkit/issues/4681
stream = append(stream, otelgrpc.StreamClientInterceptor(otelgrpc.WithTracerProvider(span.TracerProvider()), otelgrpc.WithPropagators(propagators))) //nolint:staticcheck // TODO(thaJeztah): ignore SA1019 for deprecated options: see https://github.com/moby/buildkit/issues/4681
}

unary = append(unary, grpcerrors.UnaryClientInterceptor)
stream = append(stream, grpcerrors.StreamClientInterceptor)

if len(unary) == 1 {
dialOpts = append(dialOpts, grpc.WithUnaryInterceptor(unary[0]))
} else if len(unary) > 1 {
dialOpts = append(dialOpts, grpc.WithUnaryInterceptor(grpc_middleware.ChainUnaryClient(unary...)))
}

if len(stream) == 1 {
dialOpts = append(dialOpts, grpc.WithStreamInterceptor(stream[0]))
} else if len(stream) > 1 {
dialOpts = append(dialOpts, grpc.WithStreamInterceptor(grpc_middleware.ChainStreamClient(stream...)))
statsHandler := grpcutil.StatsFilter(otelgrpc.NewClientHandler(
otelgrpc.WithTracerProvider(span.TracerProvider()),
otelgrpc.WithPropagators(propagators),
), filterHealthCheck)
dialOpts = append(dialOpts, grpc.WithStatsHandler(statsHandler))
}

cc, err := grpc.DialContext(ctx, "localhost", dialOpts...)
Expand Down
52 changes: 13 additions & 39 deletions session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,17 @@ import (
"strings"
"sync"

grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
"github.com/moby/buildkit/identity"
"github.com/moby/buildkit/util/grpcerrors"
"github.com/moby/buildkit/util/grpcutil"
"github.com/pkg/errors"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"go.opentelemetry.io/otel/propagation"
"go.opentelemetry.io/otel/trace"
"google.golang.org/grpc"
"google.golang.org/grpc/health"
"google.golang.org/grpc/health/grpc_health_v1"
"google.golang.org/grpc/stats"
)

const (
Expand Down Expand Up @@ -53,29 +54,17 @@ type Session struct {
func NewSession(ctx context.Context, name, sharedKey string) (*Session, error) {
id := identity.NewID()

var unary []grpc.UnaryServerInterceptor
var stream []grpc.StreamServerInterceptor

serverOpts := []grpc.ServerOption{}

if span := trace.SpanFromContext(ctx); span.SpanContext().IsValid() {
unary = append(unary, filterServer(otelgrpc.UnaryServerInterceptor(otelgrpc.WithTracerProvider(span.TracerProvider()), otelgrpc.WithPropagators(propagators)))) //nolint:staticcheck // TODO(thaJeztah): ignore SA1019 for deprecated options: see https://github.com/moby/buildkit/issues/4681
stream = append(stream, otelgrpc.StreamServerInterceptor(otelgrpc.WithTracerProvider(span.TracerProvider()), otelgrpc.WithPropagators(propagators))) //nolint:staticcheck // TODO(thaJeztah): ignore SA1019 for deprecated options: see https://github.com/moby/buildkit/issues/4681
serverOpts := []grpc.ServerOption{
grpc.UnaryInterceptor(grpcerrors.UnaryServerInterceptor),
grpc.StreamInterceptor(grpcerrors.StreamServerInterceptor),
}

unary = append(unary, grpcerrors.UnaryServerInterceptor)
stream = append(stream, grpcerrors.StreamServerInterceptor)

if len(unary) == 1 {
serverOpts = append(serverOpts, grpc.UnaryInterceptor(unary[0]))
} else if len(unary) > 1 {
serverOpts = append(serverOpts, grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer(unary...)))
}

if len(stream) == 1 {
serverOpts = append(serverOpts, grpc.StreamInterceptor(stream[0]))
} else if len(stream) > 1 {
serverOpts = append(serverOpts, grpc.StreamInterceptor(grpc_middleware.ChainStreamServer(stream...)))
if span := trace.SpanFromContext(ctx); span.SpanContext().IsValid() {
statsHandler := grpcutil.StatsFilter(otelgrpc.NewServerHandler(
otelgrpc.WithTracerProvider(span.TracerProvider()),
otelgrpc.WithPropagators(propagators),
), filterHealthCheck)
serverOpts = append(serverOpts, grpc.StatsHandler(statsHandler))
}

s := &Session{
Expand Down Expand Up @@ -168,21 +157,6 @@ func MethodURL(s, m string) string {
return "/" + s + "/" + m
}

// updates needed in opentelemetry-contrib to avoid this
func filterServer(intercept grpc.UnaryServerInterceptor) grpc.UnaryServerInterceptor {
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
if strings.HasSuffix(info.FullMethod, "Health/Check") {
return handler(ctx, req)
}
return intercept(ctx, req, info, handler)
}
}

func filterClient(intercept grpc.UnaryClientInterceptor) grpc.UnaryClientInterceptor {
return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
if strings.HasSuffix(method, "Health/Check") {
return invoker(ctx, method, req, reply, cc, opts...)
}
return intercept(ctx, method, req, reply, cc, invoker, opts...)
}
func filterHealthCheck(info *stats.RPCTagInfo) bool {
return strings.HasSuffix(info.FullMethodName, "Health/Check")
}
47 changes: 47 additions & 0 deletions util/grpcutil/stats_filter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package grpcutil

import (
"context"

"google.golang.org/grpc/stats"
)

type contextKey int

const filterContextKey contextKey = iota

type StatsFilterFunc func(info *stats.RPCTagInfo) bool

func StatsFilter(h stats.Handler, fn StatsFilterFunc) stats.Handler {
return &statsFilter{
inner: h,
filter: fn,
}
}

type statsFilter struct {
inner stats.Handler
filter func(info *stats.RPCTagInfo) bool
}

func (s *statsFilter) TagRPC(ctx context.Context, info *stats.RPCTagInfo) context.Context {
if s.filter(info) {
return context.WithValue(ctx, filterContextKey, struct{}{})
}
return s.inner.TagRPC(ctx, info)
}

func (s *statsFilter) HandleRPC(ctx context.Context, rpcStats stats.RPCStats) {
if ctx.Value(filterContextKey) != nil {
return
}
s.inner.HandleRPC(ctx, rpcStats)
}

func (s *statsFilter) TagConn(ctx context.Context, info *stats.ConnTagInfo) context.Context {
return s.inner.TagConn(ctx, info)
}

func (s *statsFilter) HandleConn(ctx context.Context, connStats stats.ConnStats) {
s.inner.HandleConn(ctx, connStats)
}

0 comments on commit b0183dd

Please sign in to comment.