diff --git a/client/client.go b/client/client.go index 414e156ddcbf5..fdc4ae7d9e0f7 100644 --- a/client/client.go +++ b/client/client.go @@ -18,6 +18,7 @@ import ( "github.com/moby/buildkit/session/grpchijack" "github.com/moby/buildkit/util/appdefaults" "github.com/moby/buildkit/util/grpcerrors" + "github.com/moby/buildkit/util/grpcutil" "github.com/moby/buildkit/util/tracing/otlptracegrpc" "github.com/pkg/errors" "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" @@ -29,6 +30,7 @@ import ( "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials" "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/stats" ) type Client struct { @@ -48,9 +50,6 @@ func New(ctx context.Context, address string, opts ...ClientOpt) (*Client, error } needDialer := true - var unary []grpc.UnaryClientInterceptor - var stream []grpc.StreamClientInterceptor - var customTracer bool // allows manually setting disabling tracing even if tracer in context var tracerProvider trace.TracerProvider var tracerDelegate TracerDelegate @@ -101,9 +100,14 @@ func New(ctx context.Context, address string, opts ...ClientOpt) (*Client, error } if tracerProvider != nil { - propagators := propagation.NewCompositeTextMapPropagator(propagation.TraceContext{}, propagation.Baggage{}) - unary = append(unary, filterInterceptor(otelgrpc.UnaryClientInterceptor(otelgrpc.WithTracerProvider(tracerProvider), otelgrpc.WithPropagators(propagators)))) //nolint:staticcheck // TODO(thaJeztah): ignore SA1019 for deprecated options: see https://github.com/moby/buildkit/issues/4681 - stream = append(stream, otelgrpc.StreamClientInterceptor(otelgrpc.WithTracerProvider(tracerProvider), otelgrpc.WithPropagators(propagators))) //nolint:staticcheck // TODO(thaJeztah): ignore SA1019 for deprecated options: see https://github.com/moby/buildkit/issues/4681 + gopts = append(gopts, grpc.WithStatsHandler( + grpcutil.StatsFilter(otelgrpc.NewClientHandler( + otelgrpc.WithTracerProvider(tracerProvider), + otelgrpc.WithPropagators( + propagation.NewCompositeTextMapPropagator(propagation.TraceContext{}, propagation.Baggage{}), + ), + ), traceServiceExportFilter), + )) } if needDialer { @@ -145,12 +149,8 @@ func New(ctx context.Context, address string, opts ...ClientOpt) (*Client, error } gopts = append(gopts, grpc.WithAuthority(authority)) - - unary = append(unary, grpcerrors.UnaryClientInterceptor) - stream = append(stream, grpcerrors.StreamClientInterceptor) - - gopts = append(gopts, grpc.WithChainUnaryInterceptor(unary...)) - gopts = append(gopts, grpc.WithChainStreamInterceptor(stream...)) + gopts = append(gopts, grpc.WithUnaryInterceptor(grpcerrors.UnaryClientInterceptor)) + gopts = append(gopts, grpc.WithStreamInterceptor(grpcerrors.StreamClientInterceptor)) gopts = append(gopts, customDialOptions...) conn, err := grpc.DialContext(ctx, address, gopts...) @@ -386,13 +386,8 @@ func resolveDialer(address string) (func(context.Context, string) (net.Conn, err return nil, nil } -func filterInterceptor(intercept grpc.UnaryClientInterceptor) grpc.UnaryClientInterceptor { - return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { - if strings.HasSuffix(method, "opentelemetry.proto.collector.trace.v1.TraceService/Export") { - return invoker(ctx, method, req, reply, cc, opts...) - } - return intercept(ctx, method, req, reply, cc, invoker, opts...) - } +func traceServiceExportFilter(info *stats.RPCTagInfo) bool { + return strings.HasSuffix(info.FullMethodName, "opentelemetry.proto.collector.trace.v1.TraceService/Export") } type withGRPCDialOption struct { diff --git a/cmd/buildkitd/main.go b/cmd/buildkitd/main.go index 3a3ff16f2b1ef..14914d84c6f0d 100644 --- a/cmd/buildkitd/main.go +++ b/cmd/buildkitd/main.go @@ -22,7 +22,6 @@ import ( sddaemon "github.com/coreos/go-systemd/v22/daemon" "github.com/docker/docker/pkg/reexec" "github.com/gofrs/flock" - grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware" "github.com/moby/buildkit/cache/remotecache" "github.com/moby/buildkit/cache/remotecache/azblob" "github.com/moby/buildkit/cache/remotecache/gha" @@ -47,6 +46,7 @@ import ( "github.com/moby/buildkit/util/archutil" "github.com/moby/buildkit/util/bklog" "github.com/moby/buildkit/util/grpcerrors" + "github.com/moby/buildkit/util/grpcutil" "github.com/moby/buildkit/util/profiler" "github.com/moby/buildkit/util/resolver" "github.com/moby/buildkit/util/stack" @@ -62,16 +62,15 @@ import ( "github.com/urfave/cli" "go.etcd.io/bbolt" "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" - "go.opentelemetry.io/otel/metric" "go.opentelemetry.io/otel/propagation" sdktrace "go.opentelemetry.io/otel/sdk/trace" - "go.opentelemetry.io/otel/trace" tracev1 "go.opentelemetry.io/proto/otlp/collector/trace/v1" "golang.org/x/sync/errgroup" "google.golang.org/grpc" "google.golang.org/grpc/health" healthv1 "google.golang.org/grpc/health/grpc_health_v1" "google.golang.org/grpc/reflection" + "google.golang.org/grpc/stats" ) func init() { @@ -270,16 +269,17 @@ func main() { return err } - streamTracer := otelgrpc.StreamServerInterceptor( //nolint:staticcheck // TODO(thaJeztah): ignore SA1019 for deprecated options: see https://github.com/moby/buildkit/issues/4681 + statsHandler := grpcutil.StatsFilter(otelgrpc.NewServerHandler( otelgrpc.WithTracerProvider(tp), otelgrpc.WithMeterProvider(mp), otelgrpc.WithPropagators(propagators), - ) + ), traceServiceExportFilter) - unary := grpc_middleware.ChainUnaryServer(unaryInterceptor(ctx, tp, mp), grpcerrors.UnaryServerInterceptor) - stream := grpc_middleware.ChainStreamServer(streamTracer, grpcerrors.StreamServerInterceptor) - - opts := []grpc.ServerOption{grpc.UnaryInterceptor(unary), grpc.StreamInterceptor(stream)} + opts := []grpc.ServerOption{ + grpc.StatsHandler(statsHandler), + grpc.ChainUnaryInterceptor(unaryInterceptor, grpcerrors.UnaryServerInterceptor), + grpc.StreamInterceptor(grpcerrors.StreamServerInterceptor), + } server := grpc.NewServer(opts...) // relative path does not work with nightlyone/lockfile @@ -660,38 +660,23 @@ func getListener(addr string, uid, gid int, tlsConfig *tls.Config) (net.Listener } } -func unaryInterceptor(globalCtx context.Context, tp trace.TracerProvider, mp metric.MeterProvider) grpc.UnaryServerInterceptor { - withTrace := otelgrpc.UnaryServerInterceptor( //nolint:staticcheck // TODO(thaJeztah): ignore SA1019 for deprecated options: see https://github.com/moby/buildkit/issues/4681 - otelgrpc.WithTracerProvider(tp), - otelgrpc.WithMeterProvider(mp), - otelgrpc.WithPropagators(propagators), - ) - - return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) { - ctx, cancel := context.WithCancelCause(ctx) - defer cancel(errors.WithStack(context.Canceled)) - - go func() { - select { - case <-ctx.Done(): - case <-globalCtx.Done(): - cancel(context.Cause(globalCtx)) - } - }() +func traceServiceExportFilter(info *stats.RPCTagInfo) bool { + return strings.HasSuffix(info.FullMethodName, "opentelemetry.proto.collector.trace.v1.TraceService/Export") +} - if strings.HasSuffix(info.FullMethod, "opentelemetry.proto.collector.trace.v1.TraceService/Export") { - return handler(ctx, req) - } +func unaryInterceptor(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp any, err error) { + if strings.HasSuffix(info.FullMethod, "opentelemetry.proto.collector.trace.v1.TraceService/Export") { + return handler(ctx, req) + } - resp, err = withTrace(ctx, req, info, handler) - if err != nil { - bklog.G(ctx).Errorf("%s returned error: %v", info.FullMethod, err) - if logrus.GetLevel() >= logrus.DebugLevel { - fmt.Fprintf(os.Stderr, "%+v", stack.Formatter(grpcerrors.FromGRPC(err))) - } + resp, err = handler(ctx, req) + if err != nil { + bklog.G(ctx).Errorf("%s returned error: %v", info.FullMethod, err) + if logrus.GetLevel() >= logrus.DebugLevel { + fmt.Fprintf(os.Stderr, "%+v", stack.Formatter(grpcerrors.FromGRPC(err))) } - return } + return resp, err } func serverCredentials(cfg config.TLSConfig) (*tls.Config, error) { diff --git a/session/grpc.go b/session/grpc.go index 0bc0652546b1b..62cd50b3d16f8 100644 --- a/session/grpc.go +++ b/session/grpc.go @@ -8,9 +8,9 @@ import ( "time" "github.com/containerd/containerd/defaults" - grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware" "github.com/moby/buildkit/util/bklog" "github.com/moby/buildkit/util/grpcerrors" + "github.com/moby/buildkit/util/grpcutil" "github.com/pkg/errors" "github.com/sirupsen/logrus" "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" @@ -31,9 +31,6 @@ func serve(ctx context.Context, grpcServer *grpc.Server, conn net.Conn) { } func grpcClientConn(ctx context.Context, conn net.Conn) (context.Context, *grpc.ClientConn, error) { - var unary []grpc.UnaryClientInterceptor - var stream []grpc.StreamClientInterceptor - var dialCount int64 dialer := grpc.WithContextDialer(func(ctx context.Context, addr string) (net.Conn, error) { if c := atomic.AddInt64(&dialCount, 1); c > 1 { @@ -47,26 +44,16 @@ func grpcClientConn(ctx context.Context, conn net.Conn) (context.Context, *grpc. grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(defaults.DefaultMaxRecvMsgSize)), grpc.WithDefaultCallOptions(grpc.MaxCallSendMsgSize(defaults.DefaultMaxSendMsgSize)), + grpc.WithUnaryInterceptor(grpcerrors.UnaryClientInterceptor), + grpc.WithStreamInterceptor(grpcerrors.StreamClientInterceptor), } if span := trace.SpanFromContext(ctx); span.SpanContext().IsValid() { - unary = append(unary, filterClient(otelgrpc.UnaryClientInterceptor(otelgrpc.WithTracerProvider(span.TracerProvider()), otelgrpc.WithPropagators(propagators)))) //nolint:staticcheck // TODO(thaJeztah): ignore SA1019 for deprecated options: see https://github.com/moby/buildkit/issues/4681 - stream = append(stream, otelgrpc.StreamClientInterceptor(otelgrpc.WithTracerProvider(span.TracerProvider()), otelgrpc.WithPropagators(propagators))) //nolint:staticcheck // TODO(thaJeztah): ignore SA1019 for deprecated options: see https://github.com/moby/buildkit/issues/4681 - } - - unary = append(unary, grpcerrors.UnaryClientInterceptor) - stream = append(stream, grpcerrors.StreamClientInterceptor) - - if len(unary) == 1 { - dialOpts = append(dialOpts, grpc.WithUnaryInterceptor(unary[0])) - } else if len(unary) > 1 { - dialOpts = append(dialOpts, grpc.WithUnaryInterceptor(grpc_middleware.ChainUnaryClient(unary...))) - } - - if len(stream) == 1 { - dialOpts = append(dialOpts, grpc.WithStreamInterceptor(stream[0])) - } else if len(stream) > 1 { - dialOpts = append(dialOpts, grpc.WithStreamInterceptor(grpc_middleware.ChainStreamClient(stream...))) + statsHandler := grpcutil.StatsFilter(otelgrpc.NewClientHandler( + otelgrpc.WithTracerProvider(span.TracerProvider()), + otelgrpc.WithPropagators(propagators), + ), filterHealthCheck) + dialOpts = append(dialOpts, grpc.WithStatsHandler(statsHandler)) } cc, err := grpc.DialContext(ctx, "localhost", dialOpts...) diff --git a/session/session.go b/session/session.go index 5a1ffcb58eeca..1f4cdfb017741 100644 --- a/session/session.go +++ b/session/session.go @@ -6,9 +6,9 @@ import ( "strings" "sync" - grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware" "github.com/moby/buildkit/identity" "github.com/moby/buildkit/util/grpcerrors" + "github.com/moby/buildkit/util/grpcutil" "github.com/pkg/errors" "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" "go.opentelemetry.io/otel/propagation" @@ -16,6 +16,7 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/health" "google.golang.org/grpc/health/grpc_health_v1" + "google.golang.org/grpc/stats" ) const ( @@ -53,29 +54,17 @@ type Session struct { func NewSession(ctx context.Context, name, sharedKey string) (*Session, error) { id := identity.NewID() - var unary []grpc.UnaryServerInterceptor - var stream []grpc.StreamServerInterceptor - - serverOpts := []grpc.ServerOption{} - - if span := trace.SpanFromContext(ctx); span.SpanContext().IsValid() { - unary = append(unary, filterServer(otelgrpc.UnaryServerInterceptor(otelgrpc.WithTracerProvider(span.TracerProvider()), otelgrpc.WithPropagators(propagators)))) //nolint:staticcheck // TODO(thaJeztah): ignore SA1019 for deprecated options: see https://github.com/moby/buildkit/issues/4681 - stream = append(stream, otelgrpc.StreamServerInterceptor(otelgrpc.WithTracerProvider(span.TracerProvider()), otelgrpc.WithPropagators(propagators))) //nolint:staticcheck // TODO(thaJeztah): ignore SA1019 for deprecated options: see https://github.com/moby/buildkit/issues/4681 + serverOpts := []grpc.ServerOption{ + grpc.UnaryInterceptor(grpcerrors.UnaryServerInterceptor), + grpc.StreamInterceptor(grpcerrors.StreamServerInterceptor), } - unary = append(unary, grpcerrors.UnaryServerInterceptor) - stream = append(stream, grpcerrors.StreamServerInterceptor) - - if len(unary) == 1 { - serverOpts = append(serverOpts, grpc.UnaryInterceptor(unary[0])) - } else if len(unary) > 1 { - serverOpts = append(serverOpts, grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer(unary...))) - } - - if len(stream) == 1 { - serverOpts = append(serverOpts, grpc.StreamInterceptor(stream[0])) - } else if len(stream) > 1 { - serverOpts = append(serverOpts, grpc.StreamInterceptor(grpc_middleware.ChainStreamServer(stream...))) + if span := trace.SpanFromContext(ctx); span.SpanContext().IsValid() { + statsHandler := grpcutil.StatsFilter(otelgrpc.NewServerHandler( + otelgrpc.WithTracerProvider(span.TracerProvider()), + otelgrpc.WithPropagators(propagators), + ), filterHealthCheck) + serverOpts = append(serverOpts, grpc.StatsHandler(statsHandler)) } s := &Session{ @@ -168,21 +157,6 @@ func MethodURL(s, m string) string { return "/" + s + "/" + m } -// updates needed in opentelemetry-contrib to avoid this -func filterServer(intercept grpc.UnaryServerInterceptor) grpc.UnaryServerInterceptor { - return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) { - if strings.HasSuffix(info.FullMethod, "Health/Check") { - return handler(ctx, req) - } - return intercept(ctx, req, info, handler) - } -} - -func filterClient(intercept grpc.UnaryClientInterceptor) grpc.UnaryClientInterceptor { - return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { - if strings.HasSuffix(method, "Health/Check") { - return invoker(ctx, method, req, reply, cc, opts...) - } - return intercept(ctx, method, req, reply, cc, invoker, opts...) - } +func filterHealthCheck(info *stats.RPCTagInfo) bool { + return strings.HasSuffix(info.FullMethodName, "Health/Check") } diff --git a/util/grpcutil/stats_filter.go b/util/grpcutil/stats_filter.go new file mode 100644 index 0000000000000..f93d0c520b155 --- /dev/null +++ b/util/grpcutil/stats_filter.go @@ -0,0 +1,47 @@ +package grpcutil + +import ( + "context" + + "google.golang.org/grpc/stats" +) + +type contextKey int + +const filterContextKey contextKey = iota + +type StatsFilterFunc func(info *stats.RPCTagInfo) bool + +func StatsFilter(h stats.Handler, fn StatsFilterFunc) stats.Handler { + return &statsFilter{ + inner: h, + filter: fn, + } +} + +type statsFilter struct { + inner stats.Handler + filter func(info *stats.RPCTagInfo) bool +} + +func (s *statsFilter) TagRPC(ctx context.Context, info *stats.RPCTagInfo) context.Context { + if s.filter(info) { + return context.WithValue(ctx, filterContextKey, struct{}{}) + } + return s.inner.TagRPC(ctx, info) +} + +func (s *statsFilter) HandleRPC(ctx context.Context, rpcStats stats.RPCStats) { + if ctx.Value(filterContextKey) != nil { + return + } + s.inner.HandleRPC(ctx, rpcStats) +} + +func (s *statsFilter) TagConn(ctx context.Context, info *stats.ConnTagInfo) context.Context { + return s.inner.TagConn(ctx, info) +} + +func (s *statsFilter) HandleConn(ctx context.Context, connStats stats.ConnStats) { + s.inner.HandleConn(ctx, connStats) +}