Skip to content

Commit

Permalink
server: wire up in-process Resource Service (#16978)
Browse files Browse the repository at this point in the history
  • Loading branch information
boxofrad authored Apr 18, 2023
1 parent 0c846fa commit a37a441
Show file tree
Hide file tree
Showing 5 changed files with 230 additions and 21 deletions.
15 changes: 15 additions & 0 deletions acl/resolver/danger.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: MPL-2.0

package resolver

import "github.com/hashicorp/consul/acl"

// DANGER_NO_AUTH implements an ACL resolver short-circuit authorization in
// cases where it is handled somewhere else or expressly not required.
type DANGER_NO_AUTH struct{}

// ResolveTokenAndDefaultMeta returns an authorizer with unfettered permissions.
func (DANGER_NO_AUTH) ResolveTokenAndDefaultMeta(string, *acl.EnterpriseMeta, *acl.AuthorizerContext) (Result, error) {
return Result{Authorizer: acl.ManageAll()}, nil
}
71 changes: 60 additions & 11 deletions agent/consul/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,12 @@ import (
"go.etcd.io/bbolt"
"golang.org/x/time/rate"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"

"github.com/hashicorp/consul-net-rpc/net/rpc"

"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/acl/resolver"
"github.com/hashicorp/consul/agent/consul/authmethod"
"github.com/hashicorp/consul/agent/consul/authmethod/ssoauth"
"github.com/hashicorp/consul/agent/consul/fsm"
Expand Down Expand Up @@ -67,11 +69,11 @@ import (
"github.com/hashicorp/consul/agent/token"
"github.com/hashicorp/consul/internal/resource"
"github.com/hashicorp/consul/internal/resource/demo"
"github.com/hashicorp/consul/internal/storage"
raftstorage "github.com/hashicorp/consul/internal/storage/raft"
"github.com/hashicorp/consul/lib"
"github.com/hashicorp/consul/lib/routine"
"github.com/hashicorp/consul/logging"
"github.com/hashicorp/consul/proto-public/pbresource"
"github.com/hashicorp/consul/proto/private/pbsubscribe"
"github.com/hashicorp/consul/tlsutil"
"github.com/hashicorp/consul/types"
Expand Down Expand Up @@ -424,6 +426,14 @@ type Server struct {
// routineManager is responsible for managing longer running go routines
// run by the Server
routineManager *routine.Manager

// typeRegistry contains Consul's registered resource types.
typeRegistry resource.Registry

// internalResourceServiceClient is a client that can be used to communicate
// with the Resource Service in-process (i.e. not via the network) without auth.
// It should only be used for purely-internal workloads, such as controllers.
internalResourceServiceClient pbresource.ResourceServiceClient
}

type connHandler interface {
Expand Down Expand Up @@ -486,6 +496,7 @@ func NewServer(config *Config, flat Deps, externalGRPCServer *grpc.Server, incom
publisher: flat.EventPublisher,
incomingRPCLimiter: incomingRPCLimiter,
routineManager: routine.NewManager(logger.Named(logging.ConsulServer)),
typeRegistry: resource.NewRegistry(),
}
incomingRPCLimiter.Register(s)

Expand Down Expand Up @@ -750,7 +761,7 @@ func NewServer(config *Config, flat Deps, externalGRPCServer *grpc.Server, incom
go s.overviewManager.Run(&lib.StopChannelContext{StopCh: s.shutdownCh})

// Initialize external gRPC server
s.setupExternalGRPC(config, s.raftStorageBackend, logger)
s.setupExternalGRPC(config, logger)

// Initialize internal gRPC server.
//
Expand All @@ -767,6 +778,10 @@ func NewServer(config *Config, flat Deps, externalGRPCServer *grpc.Server, incom
})
go s.xdsCapacityController.Run(&lib.StopChannelContext{StopCh: s.shutdownCh})

if err := s.setupInternalResourceService(logger); err != nil {
return nil, err
}

// Initialize Autopilot. This must happen before starting leadership monitoring
// as establishing leadership could attempt to use autopilot and cause a panic.
s.initAutopilot(config)
Expand Down Expand Up @@ -803,6 +818,10 @@ func NewServer(config *Config, flat Deps, externalGRPCServer *grpc.Server, incom
return nil, err
}

if s.config.DevMode {
demo.Register(s.typeRegistry)
}

return s, nil
}

Expand Down Expand Up @@ -1197,7 +1216,7 @@ func (s *Server) setupRPC() error {
}

// Initialize and register services on external gRPC server.
func (s *Server) setupExternalGRPC(config *Config, backend storage.Backend, logger hclog.Logger) {
func (s *Server) setupExternalGRPC(config *Config, logger hclog.Logger) {
s.externalACLServer = aclgrpc.NewServer(aclgrpc.Config{
ACLsEnabled: s.config.ACLsEnabled,
ForwardRPC: func(info structs.RPCInfo, fn func(*grpc.ClientConn) error) (bool, error) {
Expand Down Expand Up @@ -1262,20 +1281,50 @@ func (s *Server) setupExternalGRPC(config *Config, backend storage.Backend, logg
})
s.peerStreamServer.Register(s.externalGRPCServer)

registry := resource.NewRegistry()

if s.config.DevMode {
demo.Register(registry)
}

resourcegrpc.NewServer(resourcegrpc.Config{
Registry: registry,
Backend: backend,
Registry: s.typeRegistry,
Backend: s.raftStorageBackend,
ACLResolver: s.ACLResolver,
Logger: logger.Named("grpc-api.resource"),
}).Register(s.externalGRPCServer)
}

func (s *Server) setupInternalResourceService(logger hclog.Logger) error {
server := grpc.NewServer()

resourcegrpc.NewServer(resourcegrpc.Config{
Registry: s.typeRegistry,
Backend: s.raftStorageBackend,
ACLResolver: resolver.DANGER_NO_AUTH{},
Logger: logger.Named("grpc-api.resource"),
}).Register(server)

pipe := agentgrpc.NewPipeListener()
go server.Serve(pipe)

go func() {
<-s.shutdownCh
server.Stop()
}()

conn, err := grpc.Dial("",
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithContextDialer(pipe.DialContext),
grpc.WithBlock(),
)
if err != nil {
server.Stop()
return err
}
go func() {
<-s.shutdownCh
conn.Close()
}()
s.internalResourceServiceClient = pbresource.NewResourceServiceClient(conn)

return nil
}

// Shutdown is used to shutdown the server
func (s *Server) Shutdown() error {
s.logger.Info("shutting down server")
Expand Down
11 changes: 1 addition & 10 deletions agent/grpc-external/services/peerstream/subscription_view.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (

"github.com/hashicorp/go-hclog"

"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/acl/resolver"
"github.com/hashicorp/consul/agent/cache"
"github.com/hashicorp/consul/agent/consul/stream"
Expand Down Expand Up @@ -78,7 +77,7 @@ func (e *exportedServiceRequest) NewMaterializer() (submatview.Materializer, err
}
deps := submatview.LocalMaterializerDeps{
Backend: e.sub,
ACLResolver: DANGER_NO_AUTH{},
ACLResolver: resolver.DANGER_NO_AUTH{},
Deps: submatview.Deps{
View: newExportedServicesView(),
Logger: e.logger,
Expand All @@ -88,14 +87,6 @@ func (e *exportedServiceRequest) NewMaterializer() (submatview.Materializer, err
return submatview.NewLocalMaterializer(deps), nil
}

// DANGER_NO_AUTH implements submatview.ACLResolver to short-circuit authorization
// in cases where it is handled somewhere else (e.g. in an RPC handler).
type DANGER_NO_AUTH struct{}

func (DANGER_NO_AUTH) ResolveTokenAndDefaultMeta(string, *acl.EnterpriseMeta, *acl.AuthorizerContext) (resolver.Result, error) {
return resolver.Result{Authorizer: acl.ManageAll()}, nil
}

// Type implements submatview.Request
func (e *exportedServiceRequest) Type() string {
return "leader.peering.stream.exportedServiceRequest"
Expand Down
84 changes: 84 additions & 0 deletions agent/grpc-internal/pipe.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: MPL-2.0

package internal

import (
"context"
"errors"
"net"
"sync/atomic"
)

// ErrPipeClosed is returned when calling Accept or DialContext on a closed
// PipeListener.
var ErrPipeClosed = errors.New("pipe listener has been closed")

// PipeListener implements the net.Listener interface using a net.Pipe so that
// you can interact with a gRPC service in the same process without going over
// the network.
type PipeListener struct {
conns chan net.Conn
closed atomic.Bool
done chan struct{}
}

var _ net.Listener = (*PipeListener)(nil)

// NewPipeListener creates a new PipeListener.
func NewPipeListener() *PipeListener {
return &PipeListener{
conns: make(chan net.Conn),
done: make(chan struct{}),
}
}

// Accept a connection.
func (p *PipeListener) Accept() (net.Conn, error) {
select {
case conn := <-p.conns:
return conn, nil
case <-p.done:
return nil, ErrPipeClosed
}
}

// Close the listener.
func (p *PipeListener) Close() error {
if p.closed.CompareAndSwap(false, true) {
close(p.done)
}
return nil
}

// DialContext dials the server over an in-process pipe.
func (p *PipeListener) DialContext(ctx context.Context, _ string) (net.Conn, error) {
if p.closed.Load() {
return nil, ErrPipeClosed
}

serverConn, clientConn := net.Pipe()

select {
// Send the server connection to whatever is accepting connections from the
// PipeListener. This will block until something has accepted the conn.
case p.conns <- serverConn:
return clientConn, nil
case <-ctx.Done():
serverConn.Close()
clientConn.Close()
return nil, ctx.Err()
case <-p.done:
serverConn.Close()
clientConn.Close()
return nil, ErrPipeClosed
}
}

// Add returns the listener's address.
func (*PipeListener) Addr() net.Addr { return pipeAddr{} }

type pipeAddr struct{}

func (pipeAddr) Network() string { return "pipe" }
func (pipeAddr) String() string { return "pipe" }
70 changes: 70 additions & 0 deletions agent/grpc-internal/pipe_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: MPL-2.0

package internal

import (
"bufio"
"context"
"net"
"testing"

"github.com/stretchr/testify/require"
)

func TestPipeListener_RoundTrip(t *testing.T) {
lis := NewPipeListener()
t.Cleanup(func() { _ = lis.Close() })

go echoServer(lis)

conn, err := lis.DialContext(context.Background(), "")
require.NoError(t, err)
t.Cleanup(func() { _ = conn.Close() })

input := []byte("Hello World\n")
_, err = conn.Write(input)
require.NoError(t, err)

output := make([]byte, len(input))
_, err = conn.Read(output)
require.NoError(t, err)

require.Equal(t, string(input), string(output))
}

func TestPipeListener_Closed(t *testing.T) {
lis := NewPipeListener()
require.NoError(t, lis.Close())

_, err := lis.Accept()
require.ErrorIs(t, err, ErrPipeClosed)

_, err = lis.DialContext(context.Background(), "")
require.ErrorIs(t, err, ErrPipeClosed)
}

func echoServer(lis net.Listener) {
handleConn := func(conn net.Conn) {
defer conn.Close()

reader := bufio.NewReader(conn)
for {
msg, err := reader.ReadBytes('\n')
if err != nil {
return
}
if _, err := conn.Write(msg); err != nil {
return
}
}
}

for {
conn, err := lis.Accept()
if err != nil {
return
}
go handleConn(conn)
}
}

0 comments on commit a37a441

Please sign in to comment.