From a1a94599212010fe8188b6aa83d8c56fe73e86bd Mon Sep 17 00:00:00 2001 From: Daniel Upton Date: Wed, 12 Apr 2023 15:25:39 +0100 Subject: [PATCH] Wire up in-process resource service client --- agent/consul/server.go | 55 +++++++++++++++++++++++++++++++++++++++--- 1 file changed, 51 insertions(+), 4 deletions(-) diff --git a/agent/consul/server.go b/agent/consul/server.go index 96b688d6c4a3..8b6013f70ee0 100644 --- a/agent/consul/server.go +++ b/agent/consul/server.go @@ -33,10 +33,12 @@ import ( "go.etcd.io/bbolt" "golang.org/x/time/rate" "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" "github.com/hashicorp/consul-net-rpc/net/rpc" "github.com/hashicorp/consul/acl" + "github.com/hashicorp/consul/acl/resolver" "github.com/hashicorp/consul/agent/consul/authmethod" "github.com/hashicorp/consul/agent/consul/authmethod/ssoauth" "github.com/hashicorp/consul/agent/consul/fsm" @@ -67,11 +69,11 @@ import ( "github.com/hashicorp/consul/agent/token" "github.com/hashicorp/consul/internal/resource" "github.com/hashicorp/consul/internal/resource/demo" - "github.com/hashicorp/consul/internal/storage" raftstorage "github.com/hashicorp/consul/internal/storage/raft" "github.com/hashicorp/consul/lib" "github.com/hashicorp/consul/lib/routine" "github.com/hashicorp/consul/logging" + "github.com/hashicorp/consul/proto-public/pbresource" "github.com/hashicorp/consul/proto/private/pbsubscribe" "github.com/hashicorp/consul/tlsutil" "github.com/hashicorp/consul/types" @@ -427,6 +429,11 @@ type Server struct { // typeRegistry contains Consul's registered resource types. typeRegistry resource.Registry + + // internalResourceServiceClient is a client that can be used to communicate + // with the Resource Service in-process (i.e. not via the network) without auth. + // It should only be used for purely-internal workloads, such as controllers. + internalResourceServiceClient pbresource.ResourceServiceClient } type connHandler interface { @@ -754,7 +761,7 @@ func NewServer(config *Config, flat Deps, externalGRPCServer *grpc.Server, incom go s.overviewManager.Run(&lib.StopChannelContext{StopCh: s.shutdownCh}) // Initialize external gRPC server - s.setupExternalGRPC(config, s.raftStorageBackend, logger) + s.setupExternalGRPC(config, logger) // Initialize internal gRPC server. // @@ -771,6 +778,10 @@ func NewServer(config *Config, flat Deps, externalGRPCServer *grpc.Server, incom }) go s.xdsCapacityController.Run(&lib.StopChannelContext{StopCh: s.shutdownCh}) + if err := s.setupInternalResourceService(logger); err != nil { + return nil, err + } + // Initialize Autopilot. This must happen before starting leadership monitoring // as establishing leadership could attempt to use autopilot and cause a panic. s.initAutopilot(config) @@ -1205,7 +1216,7 @@ func (s *Server) setupRPC() error { } // Initialize and register services on external gRPC server. -func (s *Server) setupExternalGRPC(config *Config, backend storage.Backend, logger hclog.Logger) { +func (s *Server) setupExternalGRPC(config *Config, logger hclog.Logger) { s.externalACLServer = aclgrpc.NewServer(aclgrpc.Config{ ACLsEnabled: s.config.ACLsEnabled, ForwardRPC: func(info structs.RPCInfo, fn func(*grpc.ClientConn) error) (bool, error) { @@ -1272,12 +1283,48 @@ func (s *Server) setupExternalGRPC(config *Config, backend storage.Backend, logg resourcegrpc.NewServer(resourcegrpc.Config{ Registry: s.typeRegistry, - Backend: backend, + Backend: s.raftStorageBackend, ACLResolver: s.ACLResolver, Logger: logger.Named("grpc-api.resource"), }).Register(s.externalGRPCServer) } +func (s *Server) setupInternalResourceService(logger hclog.Logger) error { + server := grpc.NewServer() + + resourcegrpc.NewServer(resourcegrpc.Config{ + Registry: s.typeRegistry, + Backend: s.raftStorageBackend, + ACLResolver: resolver.DANGER_NO_AUTH{}, + Logger: logger.Named("grpc-api.resource"), + }).Register(server) + + pipe := agentgrpc.NewPipeListener() + go server.Serve(pipe) + + go func() { + <-s.shutdownCh + server.Stop() + }() + + conn, err := grpc.Dial("", + grpc.WithTransportCredentials(insecure.NewCredentials()), + grpc.WithContextDialer(pipe.DialContext), + grpc.WithBlock(), + ) + if err != nil { + server.Stop() + return err + } + go func() { + <-s.shutdownCh + conn.Close() + }() + s.internalResourceServiceClient = pbresource.NewResourceServiceClient(conn) + + return nil +} + // Shutdown is used to shutdown the server func (s *Server) Shutdown() error { s.logger.Info("shutting down server")