Skip to content

Commit

Permalink
Wire up in-process resource service client
Browse files Browse the repository at this point in the history
  • Loading branch information
boxofrad committed Apr 12, 2023
1 parent 8fdefff commit a1a9459
Showing 1 changed file with 51 additions and 4 deletions.
55 changes: 51 additions & 4 deletions agent/consul/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,12 @@ import (
"go.etcd.io/bbolt"
"golang.org/x/time/rate"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"

"github.com/hashicorp/consul-net-rpc/net/rpc"

"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/acl/resolver"
"github.com/hashicorp/consul/agent/consul/authmethod"
"github.com/hashicorp/consul/agent/consul/authmethod/ssoauth"
"github.com/hashicorp/consul/agent/consul/fsm"
Expand Down Expand Up @@ -67,11 +69,11 @@ import (
"github.com/hashicorp/consul/agent/token"
"github.com/hashicorp/consul/internal/resource"
"github.com/hashicorp/consul/internal/resource/demo"
"github.com/hashicorp/consul/internal/storage"
raftstorage "github.com/hashicorp/consul/internal/storage/raft"
"github.com/hashicorp/consul/lib"
"github.com/hashicorp/consul/lib/routine"
"github.com/hashicorp/consul/logging"
"github.com/hashicorp/consul/proto-public/pbresource"
"github.com/hashicorp/consul/proto/private/pbsubscribe"
"github.com/hashicorp/consul/tlsutil"
"github.com/hashicorp/consul/types"
Expand Down Expand Up @@ -427,6 +429,11 @@ type Server struct {

// typeRegistry contains Consul's registered resource types.
typeRegistry resource.Registry

// internalResourceServiceClient is a client that can be used to communicate
// with the Resource Service in-process (i.e. not via the network) without auth.
// It should only be used for purely-internal workloads, such as controllers.
internalResourceServiceClient pbresource.ResourceServiceClient
}

type connHandler interface {
Expand Down Expand Up @@ -754,7 +761,7 @@ func NewServer(config *Config, flat Deps, externalGRPCServer *grpc.Server, incom
go s.overviewManager.Run(&lib.StopChannelContext{StopCh: s.shutdownCh})

// Initialize external gRPC server
s.setupExternalGRPC(config, s.raftStorageBackend, logger)
s.setupExternalGRPC(config, logger)

// Initialize internal gRPC server.
//
Expand All @@ -771,6 +778,10 @@ func NewServer(config *Config, flat Deps, externalGRPCServer *grpc.Server, incom
})
go s.xdsCapacityController.Run(&lib.StopChannelContext{StopCh: s.shutdownCh})

if err := s.setupInternalResourceService(logger); err != nil {
return nil, err
}

// Initialize Autopilot. This must happen before starting leadership monitoring
// as establishing leadership could attempt to use autopilot and cause a panic.
s.initAutopilot(config)
Expand Down Expand Up @@ -1205,7 +1216,7 @@ func (s *Server) setupRPC() error {
}

// Initialize and register services on external gRPC server.
func (s *Server) setupExternalGRPC(config *Config, backend storage.Backend, logger hclog.Logger) {
func (s *Server) setupExternalGRPC(config *Config, logger hclog.Logger) {
s.externalACLServer = aclgrpc.NewServer(aclgrpc.Config{
ACLsEnabled: s.config.ACLsEnabled,
ForwardRPC: func(info structs.RPCInfo, fn func(*grpc.ClientConn) error) (bool, error) {
Expand Down Expand Up @@ -1272,12 +1283,48 @@ func (s *Server) setupExternalGRPC(config *Config, backend storage.Backend, logg

resourcegrpc.NewServer(resourcegrpc.Config{
Registry: s.typeRegistry,
Backend: backend,
Backend: s.raftStorageBackend,
ACLResolver: s.ACLResolver,
Logger: logger.Named("grpc-api.resource"),
}).Register(s.externalGRPCServer)
}

func (s *Server) setupInternalResourceService(logger hclog.Logger) error {
server := grpc.NewServer()

resourcegrpc.NewServer(resourcegrpc.Config{
Registry: s.typeRegistry,
Backend: s.raftStorageBackend,
ACLResolver: resolver.DANGER_NO_AUTH{},
Logger: logger.Named("grpc-api.resource"),
}).Register(server)

pipe := agentgrpc.NewPipeListener()
go server.Serve(pipe)

go func() {
<-s.shutdownCh
server.Stop()
}()

conn, err := grpc.Dial("",
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithContextDialer(pipe.DialContext),
grpc.WithBlock(),
)
if err != nil {
server.Stop()
return err
}
go func() {
<-s.shutdownCh
conn.Close()
}()
s.internalResourceServiceClient = pbresource.NewResourceServiceClient(conn)

return nil
}

// Shutdown is used to shutdown the server
func (s *Server) Shutdown() error {
s.logger.Info("shutting down server")
Expand Down

0 comments on commit a1a9459

Please sign in to comment.