Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Don't run any agent service when running otel subcommand #5748

Merged
merged 6 commits into from
Oct 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 6 additions & 22 deletions internal/pkg/agent/application/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
"github.com/elastic/elastic-agent/internal/pkg/capabilities"
"github.com/elastic/elastic-agent/internal/pkg/composable"
"github.com/elastic/elastic-agent/internal/pkg/config"
"github.com/elastic/elastic-agent/internal/pkg/otel"
"github.com/elastic/elastic-agent/internal/pkg/release"
"github.com/elastic/elastic-agent/pkg/component"
"github.com/elastic/elastic-agent/pkg/component/runtime"
Expand All @@ -47,13 +46,11 @@ func New(
testingMode bool,
fleetInitTimeout time.Duration,
disableMonitoring bool,
runAsOtel bool,
modifiers ...component.PlatformModifier,
) (*coordinator.Coordinator, coordinator.ConfigManager, composable.Controller, error) {

err := version.InitVersionError()
if err != nil && !runAsOtel {
// ignore this error when running in otel mode
if err != nil {
// non-fatal error, log a warning and move on
log.With("error.message", err).Warnf("Error initializing version information: falling back to %s", release.Version())
}
Expand Down Expand Up @@ -92,13 +89,7 @@ func New(
log.Infof("Loading baseline config from %v", pathConfigFile)
rawConfig, err = config.LoadFile(pathConfigFile)
if err != nil {
if !runAsOtel {
return nil, nil, nil, fmt.Errorf("failed to load configuration: %w", err)
}

// initialize with empty config, configuration file is not necessary in otel mode,
// best effort is fine
rawConfig = config.New()
return nil, nil, nil, fmt.Errorf("failed to load configuration: %w", err)
}
}
if err := info.InjectAgentConfig(rawConfig); err != nil {
Expand All @@ -124,7 +115,6 @@ func New(
tracer,
monitor,
cfg.Settings.GRPC,
runAsOtel,
)
if err != nil {
return nil, nil, nil, fmt.Errorf("failed to initialize runtime manager: %w", err)
Expand All @@ -141,9 +131,6 @@ func New(

// testing mode uses a config manager that takes configuration from over the control protocol
configMgr = newTestingModeConfigManager(log)
} else if runAsOtel {
// ignoring configuration in elastic-agent.yml
configMgr = otel.NewOtelModeConfigManager()
} else if configuration.IsStandalone(cfg.Fleet) {
log.Info("Parsed configuration and determined agent is managed locally")

Expand Down Expand Up @@ -187,13 +174,10 @@ func New(
}
}

var varsManager composable.Controller
if !runAsOtel {
// no need for vars in otel mode
varsManager, err = composable.New(log, rawConfig, composableManaged)
if err != nil {
return nil, nil, nil, errors.New(err, "failed to initialize composable controller")
}
// no need for vars in otel mode
varsManager, err := composable.New(log, rawConfig, composableManaged)
if err != nil {
return nil, nil, nil, errors.New(err, "failed to initialize composable controller")
}

coord := coordinator.New(log, cfg, logLevel, agentInfo, specs, reexec, upgrader, runtime, configMgr, varsManager, caps, monitor, isManaged, compModifiers...)
Expand Down
1 change: 0 additions & 1 deletion internal/pkg/agent/application/application_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@ func TestLimitsLog(t *testing.T) {
true, // testingMode
time.Millisecond, // fleetInitTimeout
true, // disable monitoring
false, // not otel mode
)
require.NoError(t, err)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -895,7 +895,7 @@ func createCoordinator(t *testing.T, ctx context.Context, opts ...CoordinatorOpt
monitoringMgr := newTestMonitoringMgr()
cfg := configuration.DefaultGRPCConfig()
cfg.Port = 0
rm, err := runtime.NewManager(l, l, ai, apmtest.DiscardTracer, monitoringMgr, cfg, false)
rm, err := runtime.NewManager(l, l, ai, apmtest.DiscardTracer, monitoringMgr, cfg)
require.NoError(t, err)

caps, err := capabilities.LoadFile(paths.AgentCapabilitiesPath(), l)
Expand Down
42 changes: 1 addition & 41 deletions internal/pkg/agent/cmd/otel.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,11 @@ package cmd

import (
"context"
goerrors "errors"
"sync"

"github.com/spf13/cobra"
"github.com/spf13/pflag"

"github.com/elastic/elastic-agent-libs/service"
"github.com/elastic/elastic-agent/internal/pkg/agent/errors"
"github.com/elastic/elastic-agent/internal/pkg/cli"
"github.com/elastic/elastic-agent/internal/pkg/otel"
)
Expand Down Expand Up @@ -79,42 +76,5 @@ func runCollector(cmdCtx context.Context, configFiles []string) error {
defer cancel()
go service.ProcessWindowsControlEvents(stopCollector)

var otelStartWg sync.WaitGroup
var errs []error
var awaiters awaiters

otelAwaiter := make(chan struct{})
awaiters = append(awaiters, otelAwaiter)

otelStartWg.Add(1)
go func() {
otelStartWg.Done()
if err := otel.Run(ctx, stop, configFiles); err != nil {
errs = append(errs, err)
// otel collector finished with an error, exit run loop
cancel()
}

// close awaiter handled in run loop
close(otelAwaiter)
}()

// wait for otel to start
otelStartWg.Wait()

if err := runElasticAgent(
ctx,
cancel,
nil, // no config overrides
stop, // service hook
false, // not in testing mode
0, // no fleet config
true, // is otel mode
awaiters, // wait for otel to finish
); err != nil && !errors.Is(err, context.Canceled) {
errs = append(errs, err)
}

return goerrors.Join(errs...)

return otel.Run(ctx, stop, configFiles)
}
27 changes: 7 additions & 20 deletions internal/pkg/agent/cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ const (

type (
cfgOverrider func(cfg *configuration.Configuration)
awaiters []<-chan struct{}
)

func newRunCommandWithArgs(_ []string, streams *cli.IOStreams) *cobra.Command {
Expand Down Expand Up @@ -155,7 +154,7 @@ func run(override cfgOverrider, testingMode bool, fleetInitTimeout time.Duration
defer cancel()
go service.ProcessWindowsControlEvents(stopBeat)

return runElasticAgent(ctx, cancel, override, stop, testingMode, fleetInitTimeout, false, nil, modifiers...)
return runElasticAgent(ctx, cancel, override, stop, testingMode, fleetInitTimeout, modifiers...)
}

func logReturn(l *logger.Logger, err error) error {
Expand All @@ -165,8 +164,8 @@ func logReturn(l *logger.Logger, err error) error {
return err
}

func runElasticAgent(ctx context.Context, cancel context.CancelFunc, override cfgOverrider, stop chan bool, testingMode bool, fleetInitTimeout time.Duration, runAsOtel bool, awaiters awaiters, modifiers ...component.PlatformModifier) error {
cfg, err := loadConfig(ctx, override, runAsOtel)
func runElasticAgent(ctx context.Context, cancel context.CancelFunc, override cfgOverrider, stop chan bool, testingMode bool, fleetInitTimeout time.Duration, modifiers ...component.PlatformModifier) error {
cfg, err := loadConfig(ctx, override)
if err != nil {
return err
}
Expand Down Expand Up @@ -205,7 +204,7 @@ func runElasticAgent(ctx context.Context, cancel context.CancelFunc, override cf
pathConfigFile := paths.AgentConfigFile()

// agent ID needs to stay empty in bootstrap mode
createAgentID := !runAsOtel
createAgentID := true
if cfg.Fleet != nil && cfg.Fleet.Server != nil && cfg.Fleet.Server.Bootstrap {
createAgentID = false
}
Expand Down Expand Up @@ -285,7 +284,7 @@ func runElasticAgent(ctx context.Context, cancel context.CancelFunc, override cf
l.Info("APM instrumentation disabled")
}

coord, configMgr, composable, err := application.New(ctx, l, baseLogger, logLvl, agentInfo, rex, tracer, testingMode, fleetInitTimeout, configuration.IsFleetServerBootstrap(cfg.Fleet), runAsOtel, modifiers...)
coord, configMgr, composable, err := application.New(ctx, l, baseLogger, logLvl, agentInfo, rex, tracer, testingMode, fleetInitTimeout, configuration.IsFleetServerBootstrap(cfg.Fleet), modifiers...)
if err != nil {
return logReturn(l, err)
}
Expand Down Expand Up @@ -397,9 +396,6 @@ LOOP:
}
cancel()
err = <-appErr
for _, a := range awaiters {
<-a // wait for awaiter to be done
}

if logShutdown {
l.Info("Shutting down completed.")
Expand All @@ -410,16 +406,7 @@ LOOP:
return logReturn(l, err)
}

func loadConfig(ctx context.Context, override cfgOverrider, runAsOtel bool) (*configuration.Configuration, error) {
if runAsOtel {
defaultCfg := configuration.DefaultConfiguration()
// disable monitoring to avoid injection of monitoring components
// in case inputs are not empty
defaultCfg.Settings.MonitoringConfig.Enabled = false
defaultCfg.Settings.V1MonitoringEnabled = false
return defaultCfg, nil
}

func loadConfig(ctx context.Context, override cfgOverrider) (*configuration.Configuration, error) {
pathConfigFile := paths.ConfigFile()
rawConfig, err := config.LoadFile(pathConfigFile)
if err != nil {
Expand Down Expand Up @@ -592,7 +579,7 @@ func tryDelayEnroll(ctx context.Context, logger *logger.Logger, cfg *configurati
errors.M("path", enrollPath)))
}
logger.Info("Successfully performed delayed enrollment of this Elastic Agent.")
return loadConfig(ctx, override, false)
return loadConfig(ctx, override)
}

func initTracer(agentName, version string, mcfg *monitoringCfg.MonitoringConfig) (*apm.Tracer, error) {
Expand Down
107 changes: 50 additions & 57 deletions pkg/component/runtime/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,8 +145,7 @@ type Manager struct {

// doneChan is closed when Manager is shutting down to signal that any
// pending requests should be canceled.
doneChan chan struct{}
runAsOtel bool
doneChan chan struct{}
}

// NewManager creates a new manager.
Expand All @@ -157,7 +156,6 @@ func NewManager(
tracer *apm.Tracer,
monitor MonitoringManager,
grpcConfig *configuration.GRPCConfig,
runAsOtel bool,
) (*Manager, error) {
ca, err := authority.NewCA()
if err != nil {
Expand Down Expand Up @@ -193,7 +191,6 @@ func NewManager(
grpcConfig: grpcConfig,
serverReady: make(chan struct{}),
doneChan: make(chan struct{}),
runAsOtel: runAsOtel,
}
return m, nil
}
Expand All @@ -212,56 +209,54 @@ func (m *Manager) Run(ctx context.Context) error {
server *grpc.Server
wgServer sync.WaitGroup
)
if !m.runAsOtel {
if m.isLocal {
listener, err = ipc.CreateListener(m.logger, m.listenAddr)
} else {
listener, err = net.Listen("tcp", m.listenAddr)
}

if err != nil {
return fmt.Errorf("error starting tcp listener for runtime manager: %w", err)
}

if m.isLocal {
defer ipc.CleanupListener(m.logger, m.listenAddr)
} else {
m.listenPort = listener.Addr().(*net.TCPAddr).Port
}
if m.isLocal {
listener, err = ipc.CreateListener(m.logger, m.listenAddr)
} else {
listener, err = net.Listen("tcp", m.listenAddr)
}

certPool := x509.NewCertPool()
if ok := certPool.AppendCertsFromPEM(m.ca.Crt()); !ok {
return errors.New("failed to append root CA")
}
creds := credentials.NewTLS(&tls.Config{
ClientAuth: tls.RequireAndVerifyClientCert,
ClientCAs: certPool,
GetCertificate: m.getCertificate,
MinVersion: tls.VersionTLS12,
})
m.logger.Infof("Starting grpc control protocol listener on port %v with max_message_size %v", m.grpcConfig.Port, m.grpcConfig.MaxMsgSize)
if m.tracer != nil {
apmInterceptor := apmgrpc.NewUnaryServerInterceptor(apmgrpc.WithRecovery(), apmgrpc.WithTracer(m.tracer))
server = grpc.NewServer(
grpc.UnaryInterceptor(apmInterceptor),
grpc.Creds(creds),
grpc.MaxRecvMsgSize(m.grpcConfig.MaxMsgSize),
)
} else {
server = grpc.NewServer(
grpc.Creds(creds),
grpc.MaxRecvMsgSize(m.grpcConfig.MaxMsgSize),
)
}
proto.RegisterElasticAgentServer(server, m)
if err != nil {
return fmt.Errorf("error starting tcp listener for runtime manager: %w", err)
}

// start serving GRPC connections
wgServer.Add(1)
go func() {
defer wgServer.Done()
m.serverLoop(ctx, listener, server)
}()
if m.isLocal {
defer ipc.CleanupListener(m.logger, m.listenAddr)
} else {
m.listenPort = listener.Addr().(*net.TCPAddr).Port
}

certPool := x509.NewCertPool()
if ok := certPool.AppendCertsFromPEM(m.ca.Crt()); !ok {
return errors.New("failed to append root CA")
}
creds := credentials.NewTLS(&tls.Config{
ClientAuth: tls.RequireAndVerifyClientCert,
ClientCAs: certPool,
GetCertificate: m.getCertificate,
MinVersion: tls.VersionTLS12,
})
m.logger.Infof("Starting grpc control protocol listener on port %v with max_message_size %v", m.grpcConfig.Port, m.grpcConfig.MaxMsgSize)
if m.tracer != nil {
apmInterceptor := apmgrpc.NewUnaryServerInterceptor(apmgrpc.WithRecovery(), apmgrpc.WithTracer(m.tracer))
server = grpc.NewServer(
grpc.UnaryInterceptor(apmInterceptor),
grpc.Creds(creds),
grpc.MaxRecvMsgSize(m.grpcConfig.MaxMsgSize),
)
} else {
server = grpc.NewServer(
grpc.Creds(creds),
grpc.MaxRecvMsgSize(m.grpcConfig.MaxMsgSize),
)
}
proto.RegisterElasticAgentServer(server, m)

// start serving GRPC connections
wgServer.Add(1)
go func() {
defer wgServer.Done()
m.serverLoop(ctx, listener, server)
}()

// Start the run loop, which continues on the main goroutine
// until the context is canceled.
Expand All @@ -271,13 +266,11 @@ func (m *Manager) Run(ctx context.Context) error {
m.shutdown()

// Close the rpc listener and wait for serverLoop to return
if !m.runAsOtel {
listener.Close()
wgServer.Wait()
listener.Close()
wgServer.Wait()

// Cancel any remaining connections
server.Stop()
}
// Cancel any remaining connections
server.Stop()
return ctx.Err()
}

Expand Down
Loading