Skip to content

Commit

Permalink
Conditional runtime server for otel mode (#5018)
Browse files Browse the repository at this point in the history
  • Loading branch information
michalpristas authored Jul 3, 2024
1 parent 6328d4b commit db40ac2
Show file tree
Hide file tree
Showing 6 changed files with 131 additions and 72 deletions.
1 change: 1 addition & 0 deletions internal/pkg/agent/application/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ func New(
tracer,
monitor,
cfg.Settings.GRPC,
runAsOtel,
)
if err != nil {
return nil, nil, nil, fmt.Errorf("failed to initialize runtime manager: %w", err)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -922,7 +922,7 @@ func createCoordinator(t *testing.T, ctx context.Context, opts ...CoordinatorOpt
require.NoError(t, err)

monitoringMgr := newTestMonitoringMgr()
rm, err := runtime.NewManager(l, l, ai, apmtest.DiscardTracer, monitoringMgr, configuration.DefaultGRPCConfig())
rm, err := runtime.NewManager(l, l, ai, apmtest.DiscardTracer, monitoringMgr, configuration.DefaultGRPCConfig(), false)
require.NoError(t, err)

caps, err := capabilities.LoadFile(paths.AgentCapabilitiesPath(), l)
Expand Down
112 changes: 59 additions & 53 deletions pkg/component/runtime/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,8 @@ type Manager struct {

// doneChan is closed when Manager is shutting down to signal that any
// pending requests should be canceled.
doneChan chan struct{}
doneChan chan struct{}
runAsOtel bool
}

// NewManager creates a new manager.
Expand All @@ -159,6 +160,7 @@ func NewManager(
tracer *apm.Tracer,
monitor MonitoringManager,
grpcConfig *configuration.GRPCConfig,
runAsOtel bool,
) (*Manager, error) {
ca, err := authority.NewCA()
if err != nil {
Expand Down Expand Up @@ -195,6 +197,7 @@ func NewManager(
grpcConfig: grpcConfig,
serverReady: &atomic.Bool{},
doneChan: make(chan struct{}),
runAsOtel: runAsOtel,
}
return m, nil
}
Expand All @@ -210,58 +213,59 @@ func (m *Manager) Run(ctx context.Context) error {
var (
listener net.Listener
err error
server *grpc.Server
wgServer sync.WaitGroup
)
if m.isLocal {
listener, err = ipc.CreateListener(m.logger, m.listenAddr)
} else {
listener, err = net.Listen("tcp", m.listenAddr)
}
if !m.runAsOtel {
if m.isLocal {
listener, err = ipc.CreateListener(m.logger, m.listenAddr)
} else {
listener, err = net.Listen("tcp", m.listenAddr)
}

if err != nil {
return fmt.Errorf("error starting tcp listener for runtime manager: %w", err)
}
if err != nil {
return fmt.Errorf("error starting tcp listener for runtime manager: %w", err)
}

if m.isLocal {
defer ipc.CleanupListener(m.logger, m.listenAddr)
} else {
m.listenPort = listener.Addr().(*net.TCPAddr).Port
}

certPool := x509.NewCertPool()
if ok := certPool.AppendCertsFromPEM(m.ca.Crt()); !ok {
return errors.New("failed to append root CA")
}
creds := credentials.NewTLS(&tls.Config{
ClientAuth: tls.RequireAndVerifyClientCert,
ClientCAs: certPool,
GetCertificate: m.getCertificate,
MinVersion: tls.VersionTLS12,
})

var server *grpc.Server
m.logger.Infof("Starting grpc control protocol listener on port %v with max_message_size %v", m.grpcConfig.Port, m.grpcConfig.MaxMsgSize)
if m.tracer != nil {
apmInterceptor := apmgrpc.NewUnaryServerInterceptor(apmgrpc.WithRecovery(), apmgrpc.WithTracer(m.tracer))
server = grpc.NewServer(
grpc.UnaryInterceptor(apmInterceptor),
grpc.Creds(creds),
grpc.MaxRecvMsgSize(m.grpcConfig.MaxMsgSize),
)
} else {
server = grpc.NewServer(
grpc.Creds(creds),
grpc.MaxRecvMsgSize(m.grpcConfig.MaxMsgSize),
)
}
proto.RegisterElasticAgentServer(server, m)
if m.isLocal {
defer ipc.CleanupListener(m.logger, m.listenAddr)
} else {
m.listenPort = listener.Addr().(*net.TCPAddr).Port
}

// start serving GRPC connections
var wgServer sync.WaitGroup
wgServer.Add(1)
go func() {
defer wgServer.Done()
m.serverLoop(ctx, listener, server)
}()
certPool := x509.NewCertPool()
if ok := certPool.AppendCertsFromPEM(m.ca.Crt()); !ok {
return errors.New("failed to append root CA")
}
creds := credentials.NewTLS(&tls.Config{
ClientAuth: tls.RequireAndVerifyClientCert,
ClientCAs: certPool,
GetCertificate: m.getCertificate,
MinVersion: tls.VersionTLS12,
})
m.logger.Infof("Starting grpc control protocol listener on port %v with max_message_size %v", m.grpcConfig.Port, m.grpcConfig.MaxMsgSize)
if m.tracer != nil {
apmInterceptor := apmgrpc.NewUnaryServerInterceptor(apmgrpc.WithRecovery(), apmgrpc.WithTracer(m.tracer))
server = grpc.NewServer(
grpc.UnaryInterceptor(apmInterceptor),
grpc.Creds(creds),
grpc.MaxRecvMsgSize(m.grpcConfig.MaxMsgSize),
)
} else {
server = grpc.NewServer(
grpc.Creds(creds),
grpc.MaxRecvMsgSize(m.grpcConfig.MaxMsgSize),
)
}
proto.RegisterElasticAgentServer(server, m)

// start serving GRPC connections
wgServer.Add(1)
go func() {
defer wgServer.Done()
m.serverLoop(ctx, listener, server)
}()
}

// Start the run loop, which continues on the main goroutine
// until the context is canceled.
Expand All @@ -271,11 +275,13 @@ func (m *Manager) Run(ctx context.Context) error {
m.shutdown()

// Close the rpc listener and wait for serverLoop to return
listener.Close()
wgServer.Wait()
if !m.runAsOtel {
listener.Close()
wgServer.Wait()

// Cancel any remaining connections
server.Stop()
// Cancel any remaining connections
server.Stop()
}
return ctx.Err()
}

Expand Down
55 changes: 37 additions & 18 deletions pkg/component/runtime/manager_fake_input_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,8 @@ func (suite *FakeInputSuite) TestManager_Features() {
agentInfo,
apmtest.DiscardTracer,
newTestMonitoringMgr(),
configuration.DefaultGRPCConfig())
configuration.DefaultGRPCConfig(),
false)
require.NoError(t, err)

managerErrCh := make(chan error)
Expand Down Expand Up @@ -312,7 +313,8 @@ func (suite *FakeInputSuite) TestManager_APM() {
agentInfo,
apmtest.DiscardTracer,
newTestMonitoringMgr(),
configuration.DefaultGRPCConfig())
configuration.DefaultGRPCConfig(),
false)
require.NoError(t, err)

managerErrCh := make(chan error)
Expand Down Expand Up @@ -547,6 +549,7 @@ func (suite *FakeInputSuite) TestManager_Limits() {
apmtest.DiscardTracer,
newTestMonitoringMgr(),
configuration.DefaultGRPCConfig(),
false,
)
require.NoError(t, err)

Expand Down Expand Up @@ -710,6 +713,7 @@ func (suite *FakeInputSuite) TestManager_ShipperLimits() {
apmtest.DiscardTracer,
newTestMonitoringMgr(),
configuration.DefaultGRPCConfig(),
false,
)
require.NoError(t, err)

Expand Down Expand Up @@ -866,7 +870,8 @@ func (suite *FakeInputSuite) TestManager_BadUnitToGood() {
defer cancel()

ai := &info.AgentInfo{}
m, err := NewManager(newDebugLogger(t), newDebugLogger(t), ai, apmtest.DiscardTracer, newTestMonitoringMgr(), configuration.DefaultGRPCConfig())
m, err := NewManager(newDebugLogger(t), newDebugLogger(t), ai, apmtest.DiscardTracer, newTestMonitoringMgr(), configuration.DefaultGRPCConfig(),
false)
require.NoError(t, err)
errCh := make(chan error)
go func() {
Expand Down Expand Up @@ -1035,7 +1040,8 @@ func (suite *FakeInputSuite) TestManager_GoodUnitToBad() {
defer cancel()

ai := &info.AgentInfo{}
m, err := NewManager(newDebugLogger(t), newDebugLogger(t), ai, apmtest.DiscardTracer, newTestMonitoringMgr(), configuration.DefaultGRPCConfig())
m, err := NewManager(newDebugLogger(t), newDebugLogger(t), ai, apmtest.DiscardTracer, newTestMonitoringMgr(), configuration.DefaultGRPCConfig(),
false)
require.NoError(t, err)
runResultChan := make(chan error, 1)
go func() {
Expand Down Expand Up @@ -1217,7 +1223,8 @@ func (suite *FakeInputSuite) TestManager_NoDeadlock() {

// Create the runtime manager
ai := &info.AgentInfo{}
m, err := NewManager(newDebugLogger(t), newDebugLogger(t), ai, apmtest.DiscardTracer, newTestMonitoringMgr(), configuration.DefaultGRPCConfig())
m, err := NewManager(newDebugLogger(t), newDebugLogger(t), ai, apmtest.DiscardTracer, newTestMonitoringMgr(), configuration.DefaultGRPCConfig(),
false)
require.NoError(t, err)

// Start the runtime manager in a goroutine, passing its termination state
Expand Down Expand Up @@ -1291,7 +1298,8 @@ func (suite *FakeInputSuite) TestManager_Configure() {
defer cancel()

ai := &info.AgentInfo{}
m, err := NewManager(newDebugLogger(t), newDebugLogger(t), ai, apmtest.DiscardTracer, newTestMonitoringMgr(), configuration.DefaultGRPCConfig())
m, err := NewManager(newDebugLogger(t), newDebugLogger(t), ai, apmtest.DiscardTracer, newTestMonitoringMgr(), configuration.DefaultGRPCConfig(),
false)
require.NoError(t, err)
errCh := make(chan error)
go func() {
Expand Down Expand Up @@ -1413,7 +1421,8 @@ func (suite *FakeInputSuite) TestManager_RemoveUnit() {
defer cancel()

ai := &info.AgentInfo{}
m, err := NewManager(newDebugLogger(t), newDebugLogger(t), ai, apmtest.DiscardTracer, newTestMonitoringMgr(), configuration.DefaultGRPCConfig())
m, err := NewManager(newDebugLogger(t), newDebugLogger(t), ai, apmtest.DiscardTracer, newTestMonitoringMgr(), configuration.DefaultGRPCConfig(),
false)
require.NoError(t, err)
errCh := make(chan error)
go func() {
Expand Down Expand Up @@ -1568,7 +1577,8 @@ func (suite *FakeInputSuite) TestManager_ActionState() {
defer cancel()

ai := &info.AgentInfo{}
m, err := NewManager(newDebugLogger(t), newDebugLogger(t), ai, apmtest.DiscardTracer, newTestMonitoringMgr(), configuration.DefaultGRPCConfig())
m, err := NewManager(newDebugLogger(t), newDebugLogger(t), ai, apmtest.DiscardTracer, newTestMonitoringMgr(), configuration.DefaultGRPCConfig(),
false)
require.NoError(t, err)
errCh := make(chan error)
go func() {
Expand Down Expand Up @@ -1693,7 +1703,8 @@ func (suite *FakeInputSuite) TestManager_Restarts() {
defer cancel()

ai := &info.AgentInfo{}
m, err := NewManager(newDebugLogger(t), newDebugLogger(t), ai, apmtest.DiscardTracer, newTestMonitoringMgr(), configuration.DefaultGRPCConfig())
m, err := NewManager(newDebugLogger(t), newDebugLogger(t), ai, apmtest.DiscardTracer, newTestMonitoringMgr(), configuration.DefaultGRPCConfig(),
false)
require.NoError(t, err)
errCh := make(chan error)
go func() {
Expand Down Expand Up @@ -1829,7 +1840,8 @@ func (suite *FakeInputSuite) TestManager_Restarts_ConfigKill() {
defer cancel()

ai := &info.AgentInfo{}
m, err := NewManager(newDebugLogger(t), newDebugLogger(t), ai, apmtest.DiscardTracer, newTestMonitoringMgr(), configuration.DefaultGRPCConfig())
m, err := NewManager(newDebugLogger(t), newDebugLogger(t), ai, apmtest.DiscardTracer, newTestMonitoringMgr(), configuration.DefaultGRPCConfig(),
false)
require.NoError(t, err)
errCh := make(chan error)
go func() {
Expand Down Expand Up @@ -1973,7 +1985,8 @@ func (suite *FakeInputSuite) TestManager_KeepsRestarting() {
defer cancel()

ai := &info.AgentInfo{}
m, err := NewManager(newDebugLogger(t), newDebugLogger(t), ai, apmtest.DiscardTracer, newTestMonitoringMgr(), configuration.DefaultGRPCConfig())
m, err := NewManager(newDebugLogger(t), newDebugLogger(t), ai, apmtest.DiscardTracer, newTestMonitoringMgr(), configuration.DefaultGRPCConfig(),
false)
require.NoError(t, err)
errCh := make(chan error)
go func() {
Expand Down Expand Up @@ -2117,7 +2130,8 @@ func (suite *FakeInputSuite) TestManager_RestartsOnMissedCheckins() {
defer cancel()

ai := &info.AgentInfo{}
m, err := NewManager(newDebugLogger(t), newDebugLogger(t), ai, apmtest.DiscardTracer, newTestMonitoringMgr(), configuration.DefaultGRPCConfig())
m, err := NewManager(newDebugLogger(t), newDebugLogger(t), ai, apmtest.DiscardTracer, newTestMonitoringMgr(), configuration.DefaultGRPCConfig(),
false)
require.NoError(t, err)
errCh := make(chan error)
go func() {
Expand Down Expand Up @@ -2236,7 +2250,8 @@ func (suite *FakeInputSuite) TestManager_InvalidAction() {
defer cancel()

ai := &info.AgentInfo{}
m, err := NewManager(newDebugLogger(t), newDebugLogger(t), ai, apmtest.DiscardTracer, newTestMonitoringMgr(), configuration.DefaultGRPCConfig())
m, err := NewManager(newDebugLogger(t), newDebugLogger(t), ai, apmtest.DiscardTracer, newTestMonitoringMgr(), configuration.DefaultGRPCConfig(),
false)
require.NoError(t, err)
errCh := make(chan error)
go func() {
Expand Down Expand Up @@ -2360,7 +2375,8 @@ func (suite *FakeInputSuite) TestManager_MultiComponent() {
agentInfo,
apmtest.DiscardTracer,
newTestMonitoringMgr(),
configuration.DefaultGRPCConfig())
configuration.DefaultGRPCConfig(),
false)
require.NoError(t, err)

errCh := make(chan error)
Expand Down Expand Up @@ -2573,7 +2589,8 @@ func (suite *FakeInputSuite) TestManager_LogLevel() {
ai,
apmtest.DiscardTracer,
newTestMonitoringMgr(),
configuration.DefaultGRPCConfig())
configuration.DefaultGRPCConfig(),
false)
require.NoError(t, err)

errCh := make(chan error)
Expand Down Expand Up @@ -2719,7 +2736,8 @@ func (suite *FakeInputSuite) TestManager_Shipper() {
defer cancel()

ai := &info.AgentInfo{}
m, err := NewManager(newDebugLogger(t), newDebugLogger(t), ai, apmtest.DiscardTracer, newTestMonitoringMgr(), configuration.DefaultGRPCConfig())
m, err := NewManager(newDebugLogger(t), newDebugLogger(t), ai, apmtest.DiscardTracer, newTestMonitoringMgr(), configuration.DefaultGRPCConfig(),
false)
require.NoError(t, err)
errCh := make(chan error)
go func() {
Expand Down Expand Up @@ -3022,7 +3040,8 @@ func (suite *FakeInputSuite) TestManager_StartStopComponent() {
ai,
apmtest.DiscardTracer,
newTestMonitoringMgr(),
configuration.DefaultGRPCConfig())
configuration.DefaultGRPCConfig(),
false)
require.NoError(t, err, "could not crete new manager")

managerErrCh := make(chan error)
Expand Down Expand Up @@ -3206,7 +3225,7 @@ func (suite *FakeInputSuite) TestManager_Chunk() {
grpcConfig.MaxMsgSize = grpcDefaultSize * 2 // set to double the default size

ai := &info.AgentInfo{}
m, err := NewManager(newDebugLogger(t), newDebugLogger(t), ai, apmtest.DiscardTracer, newTestMonitoringMgr(), grpcConfig)
m, err := NewManager(newDebugLogger(t), newDebugLogger(t), ai, apmtest.DiscardTracer, newTestMonitoringMgr(), grpcConfig, false)
require.NoError(t, err)
errCh := make(chan error)
go func() {
Expand Down
1 change: 1 addition & 0 deletions pkg/component/runtime/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ func TestManager_SimpleComponentErr(t *testing.T) {
apmtest.DiscardTracer,
newTestMonitoringMgr(),
configuration.DefaultGRPCConfig(),
false,
)
require.NoError(t, err)

Expand Down
Loading

0 comments on commit db40ac2

Please sign in to comment.