diff --git a/internal/pkg/agent/application/application.go b/internal/pkg/agent/application/application.go index eb7fbe1ab3f..b72ff5664b5 100644 --- a/internal/pkg/agent/application/application.go +++ b/internal/pkg/agent/application/application.go @@ -28,7 +28,6 @@ import ( "github.com/elastic/elastic-agent/internal/pkg/capabilities" "github.com/elastic/elastic-agent/internal/pkg/composable" "github.com/elastic/elastic-agent/internal/pkg/config" - "github.com/elastic/elastic-agent/internal/pkg/otel" "github.com/elastic/elastic-agent/internal/pkg/release" "github.com/elastic/elastic-agent/pkg/component" "github.com/elastic/elastic-agent/pkg/component/runtime" @@ -47,13 +46,11 @@ func New( testingMode bool, fleetInitTimeout time.Duration, disableMonitoring bool, - runAsOtel bool, modifiers ...component.PlatformModifier, ) (*coordinator.Coordinator, coordinator.ConfigManager, composable.Controller, error) { err := version.InitVersionError() - if err != nil && !runAsOtel { - // ignore this error when running in otel mode + if err != nil { // non-fatal error, log a warning and move on log.With("error.message", err).Warnf("Error initializing version information: falling back to %s", release.Version()) } @@ -92,13 +89,7 @@ func New( log.Infof("Loading baseline config from %v", pathConfigFile) rawConfig, err = config.LoadFile(pathConfigFile) if err != nil { - if !runAsOtel { - return nil, nil, nil, fmt.Errorf("failed to load configuration: %w", err) - } - - // initialize with empty config, configuration file is not necessary in otel mode, - // best effort is fine - rawConfig = config.New() + return nil, nil, nil, fmt.Errorf("failed to load configuration: %w", err) } } if err := info.InjectAgentConfig(rawConfig); err != nil { @@ -124,7 +115,6 @@ func New( tracer, monitor, cfg.Settings.GRPC, - runAsOtel, ) if err != nil { return nil, nil, nil, fmt.Errorf("failed to initialize runtime manager: %w", err) @@ -141,9 +131,6 @@ func New( // testing mode uses a config manager that takes configuration from over the control protocol configMgr = newTestingModeConfigManager(log) - } else if runAsOtel { - // ignoring configuration in elastic-agent.yml - configMgr = otel.NewOtelModeConfigManager() } else if configuration.IsStandalone(cfg.Fleet) { log.Info("Parsed configuration and determined agent is managed locally") @@ -189,13 +176,10 @@ func New( } } - var varsManager composable.Controller - if !runAsOtel { - // no need for vars in otel mode - varsManager, err = composable.New(log, rawConfig, composableManaged) - if err != nil { - return nil, nil, nil, errors.New(err, "failed to initialize composable controller") - } + // no need for vars in otel mode + varsManager, err := composable.New(log, rawConfig, composableManaged) + if err != nil { + return nil, nil, nil, errors.New(err, "failed to initialize composable controller") } coord := coordinator.New(log, cfg, logLevel, agentInfo, specs, reexec, upgrader, runtime, configMgr, varsManager, caps, monitor, isManaged, compModifiers...) diff --git a/internal/pkg/agent/application/application_test.go b/internal/pkg/agent/application/application_test.go index d2118bd47ff..275ca0df6a1 100644 --- a/internal/pkg/agent/application/application_test.go +++ b/internal/pkg/agent/application/application_test.go @@ -63,7 +63,6 @@ func TestLimitsLog(t *testing.T) { true, // testingMode time.Millisecond, // fleetInitTimeout true, // disable monitoring - false, // not otel mode ) require.NoError(t, err) diff --git a/internal/pkg/agent/application/coordinator/coordinator_test.go b/internal/pkg/agent/application/coordinator/coordinator_test.go index dfb8b7c97b2..23f323fee54 100644 --- a/internal/pkg/agent/application/coordinator/coordinator_test.go +++ b/internal/pkg/agent/application/coordinator/coordinator_test.go @@ -895,7 +895,7 @@ func createCoordinator(t *testing.T, ctx context.Context, opts ...CoordinatorOpt monitoringMgr := newTestMonitoringMgr() cfg := configuration.DefaultGRPCConfig() cfg.Port = 0 - rm, err := runtime.NewManager(l, l, ai, apmtest.DiscardTracer, monitoringMgr, cfg, false) + rm, err := runtime.NewManager(l, l, ai, apmtest.DiscardTracer, monitoringMgr, cfg) require.NoError(t, err) caps, err := capabilities.LoadFile(paths.AgentCapabilitiesPath(), l) diff --git a/internal/pkg/agent/cmd/otel.go b/internal/pkg/agent/cmd/otel.go index d23c308d30f..eaad5d7bd11 100644 --- a/internal/pkg/agent/cmd/otel.go +++ b/internal/pkg/agent/cmd/otel.go @@ -8,14 +8,11 @@ package cmd import ( "context" - goerrors "errors" - "sync" "github.com/spf13/cobra" "github.com/spf13/pflag" "github.com/elastic/elastic-agent-libs/service" - "github.com/elastic/elastic-agent/internal/pkg/agent/errors" "github.com/elastic/elastic-agent/internal/pkg/cli" "github.com/elastic/elastic-agent/internal/pkg/otel" ) @@ -79,42 +76,5 @@ func runCollector(cmdCtx context.Context, configFiles []string) error { defer cancel() go service.ProcessWindowsControlEvents(stopCollector) - var otelStartWg sync.WaitGroup - var errs []error - var awaiters awaiters - - otelAwaiter := make(chan struct{}) - awaiters = append(awaiters, otelAwaiter) - - otelStartWg.Add(1) - go func() { - otelStartWg.Done() - if err := otel.Run(ctx, stop, configFiles); err != nil { - errs = append(errs, err) - // otel collector finished with an error, exit run loop - cancel() - } - - // close awaiter handled in run loop - close(otelAwaiter) - }() - - // wait for otel to start - otelStartWg.Wait() - - if err := runElasticAgent( - ctx, - cancel, - nil, // no config overrides - stop, // service hook - false, // not in testing mode - 0, // no fleet config - true, // is otel mode - awaiters, // wait for otel to finish - ); err != nil && !errors.Is(err, context.Canceled) { - errs = append(errs, err) - } - - return goerrors.Join(errs...) - + return otel.Run(ctx, stop, configFiles) } diff --git a/internal/pkg/agent/cmd/run.go b/internal/pkg/agent/cmd/run.go index 80732a3b997..d0d231267f9 100644 --- a/internal/pkg/agent/cmd/run.go +++ b/internal/pkg/agent/cmd/run.go @@ -64,7 +64,6 @@ const ( type ( cfgOverrider func(cfg *configuration.Configuration) - awaiters []<-chan struct{} ) func newRunCommandWithArgs(_ []string, streams *cli.IOStreams) *cobra.Command { @@ -155,7 +154,7 @@ func run(override cfgOverrider, testingMode bool, fleetInitTimeout time.Duration defer cancel() go service.ProcessWindowsControlEvents(stopBeat) - return runElasticAgent(ctx, cancel, override, stop, testingMode, fleetInitTimeout, false, nil, modifiers...) + return runElasticAgent(ctx, cancel, override, stop, testingMode, fleetInitTimeout, modifiers...) } func logReturn(l *logger.Logger, err error) error { @@ -165,8 +164,8 @@ func logReturn(l *logger.Logger, err error) error { return err } -func runElasticAgent(ctx context.Context, cancel context.CancelFunc, override cfgOverrider, stop chan bool, testingMode bool, fleetInitTimeout time.Duration, runAsOtel bool, awaiters awaiters, modifiers ...component.PlatformModifier) error { - cfg, err := loadConfig(ctx, override, runAsOtel) +func runElasticAgent(ctx context.Context, cancel context.CancelFunc, override cfgOverrider, stop chan bool, testingMode bool, fleetInitTimeout time.Duration, modifiers ...component.PlatformModifier) error { + cfg, err := loadConfig(ctx, override) if err != nil { return err } @@ -205,7 +204,7 @@ func runElasticAgent(ctx context.Context, cancel context.CancelFunc, override cf pathConfigFile := paths.AgentConfigFile() // agent ID needs to stay empty in bootstrap mode - createAgentID := !runAsOtel + createAgentID := true if cfg.Fleet != nil && cfg.Fleet.Server != nil && cfg.Fleet.Server.Bootstrap { createAgentID = false } @@ -285,7 +284,7 @@ func runElasticAgent(ctx context.Context, cancel context.CancelFunc, override cf l.Info("APM instrumentation disabled") } - coord, configMgr, composable, err := application.New(ctx, l, baseLogger, logLvl, agentInfo, rex, tracer, testingMode, fleetInitTimeout, configuration.IsFleetServerBootstrap(cfg.Fleet), runAsOtel, modifiers...) + coord, configMgr, composable, err := application.New(ctx, l, baseLogger, logLvl, agentInfo, rex, tracer, testingMode, fleetInitTimeout, configuration.IsFleetServerBootstrap(cfg.Fleet), modifiers...) if err != nil { return logReturn(l, err) } @@ -397,9 +396,6 @@ LOOP: } cancel() err = <-appErr - for _, a := range awaiters { - <-a // wait for awaiter to be done - } if logShutdown { l.Info("Shutting down completed.") @@ -410,16 +406,7 @@ LOOP: return logReturn(l, err) } -func loadConfig(ctx context.Context, override cfgOverrider, runAsOtel bool) (*configuration.Configuration, error) { - if runAsOtel { - defaultCfg := configuration.DefaultConfiguration() - // disable monitoring to avoid injection of monitoring components - // in case inputs are not empty - defaultCfg.Settings.MonitoringConfig.Enabled = false - defaultCfg.Settings.V1MonitoringEnabled = false - return defaultCfg, nil - } - +func loadConfig(ctx context.Context, override cfgOverrider) (*configuration.Configuration, error) { pathConfigFile := paths.ConfigFile() rawConfig, err := config.LoadFile(pathConfigFile) if err != nil { @@ -592,7 +579,7 @@ func tryDelayEnroll(ctx context.Context, logger *logger.Logger, cfg *configurati errors.M("path", enrollPath))) } logger.Info("Successfully performed delayed enrollment of this Elastic Agent.") - return loadConfig(ctx, override, false) + return loadConfig(ctx, override) } func initTracer(agentName, version string, mcfg *monitoringCfg.MonitoringConfig) (*apm.Tracer, error) { diff --git a/pkg/component/runtime/manager.go b/pkg/component/runtime/manager.go index 6e34e99ce9c..5ee8b5b4681 100644 --- a/pkg/component/runtime/manager.go +++ b/pkg/component/runtime/manager.go @@ -145,8 +145,7 @@ type Manager struct { // doneChan is closed when Manager is shutting down to signal that any // pending requests should be canceled. - doneChan chan struct{} - runAsOtel bool + doneChan chan struct{} } // NewManager creates a new manager. @@ -157,7 +156,6 @@ func NewManager( tracer *apm.Tracer, monitor MonitoringManager, grpcConfig *configuration.GRPCConfig, - runAsOtel bool, ) (*Manager, error) { ca, err := authority.NewCA() if err != nil { @@ -193,7 +191,6 @@ func NewManager( grpcConfig: grpcConfig, serverReady: make(chan struct{}), doneChan: make(chan struct{}), - runAsOtel: runAsOtel, } return m, nil } @@ -212,56 +209,54 @@ func (m *Manager) Run(ctx context.Context) error { server *grpc.Server wgServer sync.WaitGroup ) - if !m.runAsOtel { - if m.isLocal { - listener, err = ipc.CreateListener(m.logger, m.listenAddr) - } else { - listener, err = net.Listen("tcp", m.listenAddr) - } - - if err != nil { - return fmt.Errorf("error starting tcp listener for runtime manager: %w", err) - } - - if m.isLocal { - defer ipc.CleanupListener(m.logger, m.listenAddr) - } else { - m.listenPort = listener.Addr().(*net.TCPAddr).Port - } + if m.isLocal { + listener, err = ipc.CreateListener(m.logger, m.listenAddr) + } else { + listener, err = net.Listen("tcp", m.listenAddr) + } - certPool := x509.NewCertPool() - if ok := certPool.AppendCertsFromPEM(m.ca.Crt()); !ok { - return errors.New("failed to append root CA") - } - creds := credentials.NewTLS(&tls.Config{ - ClientAuth: tls.RequireAndVerifyClientCert, - ClientCAs: certPool, - GetCertificate: m.getCertificate, - MinVersion: tls.VersionTLS12, - }) - m.logger.Infof("Starting grpc control protocol listener on port %v with max_message_size %v", m.grpcConfig.Port, m.grpcConfig.MaxMsgSize) - if m.tracer != nil { - apmInterceptor := apmgrpc.NewUnaryServerInterceptor(apmgrpc.WithRecovery(), apmgrpc.WithTracer(m.tracer)) - server = grpc.NewServer( - grpc.UnaryInterceptor(apmInterceptor), - grpc.Creds(creds), - grpc.MaxRecvMsgSize(m.grpcConfig.MaxMsgSize), - ) - } else { - server = grpc.NewServer( - grpc.Creds(creds), - grpc.MaxRecvMsgSize(m.grpcConfig.MaxMsgSize), - ) - } - proto.RegisterElasticAgentServer(server, m) + if err != nil { + return fmt.Errorf("error starting tcp listener for runtime manager: %w", err) + } - // start serving GRPC connections - wgServer.Add(1) - go func() { - defer wgServer.Done() - m.serverLoop(ctx, listener, server) - }() + if m.isLocal { + defer ipc.CleanupListener(m.logger, m.listenAddr) + } else { + m.listenPort = listener.Addr().(*net.TCPAddr).Port + } + + certPool := x509.NewCertPool() + if ok := certPool.AppendCertsFromPEM(m.ca.Crt()); !ok { + return errors.New("failed to append root CA") + } + creds := credentials.NewTLS(&tls.Config{ + ClientAuth: tls.RequireAndVerifyClientCert, + ClientCAs: certPool, + GetCertificate: m.getCertificate, + MinVersion: tls.VersionTLS12, + }) + m.logger.Infof("Starting grpc control protocol listener on port %v with max_message_size %v", m.grpcConfig.Port, m.grpcConfig.MaxMsgSize) + if m.tracer != nil { + apmInterceptor := apmgrpc.NewUnaryServerInterceptor(apmgrpc.WithRecovery(), apmgrpc.WithTracer(m.tracer)) + server = grpc.NewServer( + grpc.UnaryInterceptor(apmInterceptor), + grpc.Creds(creds), + grpc.MaxRecvMsgSize(m.grpcConfig.MaxMsgSize), + ) + } else { + server = grpc.NewServer( + grpc.Creds(creds), + grpc.MaxRecvMsgSize(m.grpcConfig.MaxMsgSize), + ) } + proto.RegisterElasticAgentServer(server, m) + + // start serving GRPC connections + wgServer.Add(1) + go func() { + defer wgServer.Done() + m.serverLoop(ctx, listener, server) + }() // Start the run loop, which continues on the main goroutine // until the context is canceled. @@ -271,13 +266,11 @@ func (m *Manager) Run(ctx context.Context) error { m.shutdown() // Close the rpc listener and wait for serverLoop to return - if !m.runAsOtel { - listener.Close() - wgServer.Wait() + listener.Close() + wgServer.Wait() - // Cancel any remaining connections - server.Stop() - } + // Cancel any remaining connections + server.Stop() return ctx.Err() } diff --git a/pkg/component/runtime/manager_fake_input_test.go b/pkg/component/runtime/manager_fake_input_test.go index 22b960cc258..e177994196d 100644 --- a/pkg/component/runtime/manager_fake_input_test.go +++ b/pkg/component/runtime/manager_fake_input_test.go @@ -97,8 +97,7 @@ func (suite *FakeInputSuite) TestManager_Features() { agentInfo, apmtest.DiscardTracer, newTestMonitoringMgr(), - testGrpcConfig(), - false) + testGrpcConfig()) require.NoError(t, err) managerErrCh := make(chan error) @@ -298,8 +297,7 @@ func (suite *FakeInputSuite) TestManager_APM() { agentInfo, apmtest.DiscardTracer, newTestMonitoringMgr(), - testGrpcConfig(), - false) + testGrpcConfig()) require.NoError(t, err) managerErrCh := make(chan error) @@ -575,9 +573,7 @@ func (suite *FakeInputSuite) TestManager_Limits() { agentInfo, apmtest.DiscardTracer, newTestMonitoringMgr(), - testGrpcConfig(), - false, - ) + testGrpcConfig()) require.NoError(t, err) managerErrCh := make(chan error) @@ -733,8 +729,7 @@ func (suite *FakeInputSuite) TestManager_BadUnitToGood() { defer cancel() ai := &info.AgentInfo{} - m, err := NewManager(newDebugLogger(t), newDebugLogger(t), ai, apmtest.DiscardTracer, newTestMonitoringMgr(), testGrpcConfig(), - false) + m, err := NewManager(newDebugLogger(t), newDebugLogger(t), ai, apmtest.DiscardTracer, newTestMonitoringMgr(), testGrpcConfig()) require.NoError(t, err) errCh := make(chan error) go func() { @@ -903,8 +898,7 @@ func (suite *FakeInputSuite) TestManager_GoodUnitToBad() { defer cancel() ai := &info.AgentInfo{} - m, err := NewManager(newDebugLogger(t), newDebugLogger(t), ai, apmtest.DiscardTracer, newTestMonitoringMgr(), testGrpcConfig(), - false) + m, err := NewManager(newDebugLogger(t), newDebugLogger(t), ai, apmtest.DiscardTracer, newTestMonitoringMgr(), testGrpcConfig()) require.NoError(t, err) runResultChan := make(chan error, 1) go func() { @@ -1086,8 +1080,7 @@ func (suite *FakeInputSuite) TestManager_NoDeadlock() { // Create the runtime manager ai := &info.AgentInfo{} - m, err := NewManager(newDebugLogger(t), newDebugLogger(t), ai, apmtest.DiscardTracer, newTestMonitoringMgr(), testGrpcConfig(), - false) + m, err := NewManager(newDebugLogger(t), newDebugLogger(t), ai, apmtest.DiscardTracer, newTestMonitoringMgr(), testGrpcConfig()) require.NoError(t, err) // Start the runtime manager in a goroutine, passing its termination state @@ -1161,8 +1154,7 @@ func (suite *FakeInputSuite) TestManager_Configure() { defer cancel() ai := &info.AgentInfo{} - m, err := NewManager(newDebugLogger(t), newDebugLogger(t), ai, apmtest.DiscardTracer, newTestMonitoringMgr(), testGrpcConfig(), - false) + m, err := NewManager(newDebugLogger(t), newDebugLogger(t), ai, apmtest.DiscardTracer, newTestMonitoringMgr(), testGrpcConfig()) require.NoError(t, err) errCh := make(chan error) go func() { @@ -1284,8 +1276,7 @@ func (suite *FakeInputSuite) TestManager_RemoveUnit() { defer cancel() ai := &info.AgentInfo{} - m, err := NewManager(newDebugLogger(t), newDebugLogger(t), ai, apmtest.DiscardTracer, newTestMonitoringMgr(), testGrpcConfig(), - false) + m, err := NewManager(newDebugLogger(t), newDebugLogger(t), ai, apmtest.DiscardTracer, newTestMonitoringMgr(), testGrpcConfig()) require.NoError(t, err) errCh := make(chan error) go func() { @@ -1440,8 +1431,7 @@ func (suite *FakeInputSuite) TestManager_ActionState() { defer cancel() ai := &info.AgentInfo{} - m, err := NewManager(newDebugLogger(t), newDebugLogger(t), ai, apmtest.DiscardTracer, newTestMonitoringMgr(), testGrpcConfig(), - false) + m, err := NewManager(newDebugLogger(t), newDebugLogger(t), ai, apmtest.DiscardTracer, newTestMonitoringMgr(), testGrpcConfig()) require.NoError(t, err) errCh := make(chan error) go func() { @@ -1566,8 +1556,7 @@ func (suite *FakeInputSuite) TestManager_Restarts() { defer cancel() ai := &info.AgentInfo{} - m, err := NewManager(newDebugLogger(t), newDebugLogger(t), ai, apmtest.DiscardTracer, newTestMonitoringMgr(), testGrpcConfig(), - false) + m, err := NewManager(newDebugLogger(t), newDebugLogger(t), ai, apmtest.DiscardTracer, newTestMonitoringMgr(), testGrpcConfig()) require.NoError(t, err) errCh := make(chan error) go func() { @@ -1703,8 +1692,7 @@ func (suite *FakeInputSuite) TestManager_Restarts_ConfigKill() { defer cancel() ai := &info.AgentInfo{} - m, err := NewManager(newDebugLogger(t), newDebugLogger(t), ai, apmtest.DiscardTracer, newTestMonitoringMgr(), testGrpcConfig(), - false) + m, err := NewManager(newDebugLogger(t), newDebugLogger(t), ai, apmtest.DiscardTracer, newTestMonitoringMgr(), testGrpcConfig()) require.NoError(t, err) errCh := make(chan error) go func() { @@ -1848,8 +1836,7 @@ func (suite *FakeInputSuite) TestManager_KeepsRestarting() { defer cancel() ai := &info.AgentInfo{} - m, err := NewManager(newDebugLogger(t), newDebugLogger(t), ai, apmtest.DiscardTracer, newTestMonitoringMgr(), testGrpcConfig(), - false) + m, err := NewManager(newDebugLogger(t), newDebugLogger(t), ai, apmtest.DiscardTracer, newTestMonitoringMgr(), testGrpcConfig()) require.NoError(t, err) errCh := make(chan error) go func() { @@ -1993,8 +1980,7 @@ func (suite *FakeInputSuite) TestManager_RestartsOnMissedCheckins() { defer cancel() ai := &info.AgentInfo{} - m, err := NewManager(newDebugLogger(t), newDebugLogger(t), ai, apmtest.DiscardTracer, newTestMonitoringMgr(), testGrpcConfig(), - false) + m, err := NewManager(newDebugLogger(t), newDebugLogger(t), ai, apmtest.DiscardTracer, newTestMonitoringMgr(), testGrpcConfig()) require.NoError(t, err) errCh := make(chan error) go func() { @@ -2113,8 +2099,7 @@ func (suite *FakeInputSuite) TestManager_InvalidAction() { defer cancel() ai := &info.AgentInfo{} - m, err := NewManager(newDebugLogger(t), newDebugLogger(t), ai, apmtest.DiscardTracer, newTestMonitoringMgr(), testGrpcConfig(), - false) + m, err := NewManager(newDebugLogger(t), newDebugLogger(t), ai, apmtest.DiscardTracer, newTestMonitoringMgr(), testGrpcConfig()) require.NoError(t, err) errCh := make(chan error) go func() { @@ -2238,8 +2223,7 @@ func (suite *FakeInputSuite) TestManager_MultiComponent() { agentInfo, apmtest.DiscardTracer, newTestMonitoringMgr(), - testGrpcConfig(), - false) + testGrpcConfig()) require.NoError(t, err) errCh := make(chan error) @@ -2452,8 +2436,7 @@ func (suite *FakeInputSuite) TestManager_LogLevel() { ai, apmtest.DiscardTracer, newTestMonitoringMgr(), - testGrpcConfig(), - false) + testGrpcConfig()) require.NoError(t, err) errCh := make(chan error) @@ -2594,8 +2577,7 @@ func (suite *FakeInputSuite) TestManager_StartStopComponent() { ai, apmtest.DiscardTracer, newTestMonitoringMgr(), - testGrpcConfig(), - false) + testGrpcConfig()) require.NoError(t, err, "could not crete new manager") managerErrCh := make(chan error) @@ -2782,7 +2764,7 @@ func (suite *FakeInputSuite) TestManager_Chunk() { grpcConfig.MaxMsgSize = grpcDefaultSize * 2 // set to double the default size ai := &info.AgentInfo{} - m, err := NewManager(newDebugLogger(t), newDebugLogger(t), ai, apmtest.DiscardTracer, newTestMonitoringMgr(), grpcConfig, false) + m, err := NewManager(newDebugLogger(t), newDebugLogger(t), ai, apmtest.DiscardTracer, newTestMonitoringMgr(), grpcConfig) require.NoError(t, err) errCh := make(chan error) go func() { diff --git a/pkg/component/runtime/manager_test.go b/pkg/component/runtime/manager_test.go index 0bfaf75ff4e..3423d50514c 100644 --- a/pkg/component/runtime/manager_test.go +++ b/pkg/component/runtime/manager_test.go @@ -32,9 +32,7 @@ func TestManager_SimpleComponentErr(t *testing.T) { ai, apmtest.DiscardTracer, newTestMonitoringMgr(), - testGrpcConfig(), - false, - ) + testGrpcConfig()) require.NoError(t, err) errCh := make(chan error) diff --git a/pkg/testing/fixture.go b/pkg/testing/fixture.go index b8619622ab4..5da090f1a07 100644 --- a/pkg/testing/fixture.go +++ b/pkg/testing/fixture.go @@ -477,8 +477,8 @@ func RunProcess(t *testing.T, // when `Run` is called. // // if shouldWatchState is set to false, communicating state does not happen. -func (f *Fixture) RunOtelWithClient(ctx context.Context, shouldWatchState bool, enableTestingMode bool, states ...State) error { - return f.executeWithClient(ctx, "otel", false, shouldWatchState, enableTestingMode, states...) +func (f *Fixture) RunOtelWithClient(ctx context.Context, states ...State) error { + return f.executeWithClient(ctx, "otel", false, false, false, states...) } func (f *Fixture) executeWithClient(ctx context.Context, command string, disableEncryptedStore bool, shouldWatchState bool, enableTestingMode bool, states ...State) error { diff --git a/testing/integration/kubernetes_agent_service_test.go b/testing/integration/kubernetes_agent_service_test.go index 4a5ebdda2ad..de015a06d15 100644 --- a/testing/integration/kubernetes_agent_service_test.go +++ b/testing/integration/kubernetes_agent_service_test.go @@ -123,7 +123,7 @@ func TestKubernetesAgentService(t *testing.T) { ctx := context.Background() - deployK8SAgent(t, ctx, client, k8sObjects, testNamespace, false, testLogsBasePath, map[string]bool{ + deployK8SAgent(t, ctx, client, k8sObjects, testNamespace, false, testLogsBasePath, true, map[string]bool{ "connectors-py": true, }) } diff --git a/testing/integration/kubernetes_agent_standalone_test.go b/testing/integration/kubernetes_agent_standalone_test.go index ddcbb559cca..bce103996f5 100644 --- a/testing/integration/kubernetes_agent_standalone_test.go +++ b/testing/integration/kubernetes_agent_standalone_test.go @@ -221,7 +221,7 @@ func TestKubernetesAgentStandaloneKustomize(t *testing.T) { ctx := context.Background() - deployK8SAgent(t, ctx, client, k8sObjects, testNamespace, tc.runK8SInnerTests, testLogsBasePath, nil) + deployK8SAgent(t, ctx, client, k8sObjects, testNamespace, tc.runK8SInnerTests, testLogsBasePath, true, nil) }) } @@ -266,10 +266,9 @@ func TestKubernetesAgentOtel(t *testing.T) { require.NoError(t, err, "failed to render kustomize") testCases := []struct { - name string - envAdd []corev1.EnvVar - runK8SInnerTests bool - componentPresence map[string]bool + name string + envAdd []corev1.EnvVar + runK8SInnerTests bool }{ { @@ -278,11 +277,6 @@ func TestKubernetesAgentOtel(t *testing.T) { {Name: "ELASTIC_AGENT_OTEL", Value: "true"}, }, false, - map[string]bool{ - "beat/metrics-monitoring": false, - "filestream-monitoring": false, - "system/metrics-default": false, - }, }, } @@ -340,7 +334,7 @@ func TestKubernetesAgentOtel(t *testing.T) { ctx := context.Background() - deployK8SAgent(t, ctx, client, k8sObjects, testNamespace, tc.runK8SInnerTests, testLogsBasePath, tc.componentPresence) + deployK8SAgent(t, ctx, client, k8sObjects, testNamespace, tc.runK8SInnerTests, testLogsBasePath, false, nil) }) } } @@ -601,7 +595,7 @@ func TestKubernetesAgentHelm(t *testing.T) { // deployK8SAgent is a helper function to deploy the elastic-agent in k8s and invoke the inner k8s tests if // runK8SInnerTests is true func deployK8SAgent(t *testing.T, ctx context.Context, client klient.Client, objects []k8s.Object, namespace string, - runInnerK8STests bool, testLogsBasePath string, componentPresence map[string]bool) { + runInnerK8STests bool, testLogsBasePath string, checkStatus bool, componentPresence map[string]bool) { objects = append([]k8s.Object{&corev1.Namespace{ ObjectMeta: metav1.ObjectMeta{ @@ -661,6 +655,11 @@ func deployK8SAgent(t *testing.T, ctx context.Context, client klient.Client, obj require.NotEmpty(t, agentPodName, "agent pod name is empty") + if !checkStatus { + // not checking status + return + } + command := []string{"elastic-agent", "status", "--output=json"} var status atesting.AgentStatusOutput var stdout, stderr bytes.Buffer diff --git a/testing/integration/otel_test.go b/testing/integration/otel_test.go index 7e9ce375c84..6fe4c3c28eb 100644 --- a/testing/integration/otel_test.go +++ b/testing/integration/otel_test.go @@ -20,10 +20,7 @@ import ( "github.com/stretchr/testify/require" - "github.com/elastic/elastic-agent/pkg/control/v2/client" aTesting "github.com/elastic/elastic-agent/pkg/testing" - atesting "github.com/elastic/elastic-agent/pkg/testing" - integrationtest "github.com/elastic/elastic-agent/pkg/testing" "github.com/elastic/elastic-agent/pkg/testing/define" "github.com/elastic/elastic-agent/pkg/testing/tools/estools" "github.com/elastic/elastic-agent/pkg/testing/tools/testcontext" @@ -146,7 +143,7 @@ func TestOtelFileProcessing(t *testing.T) { fixtureWg.Add(1) go func() { defer fixtureWg.Done() - err = fixture.RunOtelWithClient(ctx, false, false) + err = fixture.RunOtelWithClient(ctx) }() var content []byte @@ -157,22 +154,6 @@ func TestOtelFileProcessing(t *testing.T) { validateCommandIsWorking(t, ctx, fixture, tempDir) - // check `elastic-agent status` returns successfully - require.Eventuallyf(t, func() bool { - // This will return errors until it connects to the agent, - // they're mostly noise because until the agent starts running - // we will get connection errors. If the test fails - // the agent logs will be present in the error message - // which should help to explain why the agent was not - // healthy. - err = fixture.IsHealthy(ctx) - return err == nil - }, - 2*time.Minute, time.Second, - "Elastic-Agent did not report healthy. Agent status error: \"%v\"", - err, - ) - require.Eventually(t, func() bool { // verify file exists @@ -195,8 +176,6 @@ func TestOtelFileProcessing(t *testing.T) { 3*time.Minute, 500*time.Millisecond, fmt.Sprintf("there should be exported logs by now")) - testAgentCanRun(ctx, t, fixture) - cancel() fixtureWg.Wait() require.True(t, err == nil || err == context.Canceled || err == context.DeadlineExceeded, "Retrieved unexpected error: %s", err.Error()) @@ -314,27 +293,11 @@ func TestOtelLogsIngestion(t *testing.T) { fixtureWg.Add(1) go func() { defer fixtureWg.Done() - err = fixture.RunOtelWithClient(ctx, false, false) + err = fixture.RunOtelWithClient(ctx) }() validateCommandIsWorking(t, ctx, fixture, tempDir) - // check `elastic-agent status` returns successfully - require.Eventuallyf(t, func() bool { - // This will return errors until it connects to the agent, - // they're mostly noise because until the agent starts running - // we will get connection errors. If the test fails - // the agent logs will be present in the error message - // which should help to explain why the agent was not - // healthy. - err = fixture.IsHealthy(ctx) - return err == nil - }, - 2*time.Minute, time.Second, - "Elastic-Agent did not report healthy. Agent status error: \"%v\"", - err, - ) - // Write logs to input file. logsCount := 10_000 inputFile, err := os.OpenFile(inputFilePath, os.O_CREATE|os.O_WRONLY, 0600) @@ -451,7 +414,7 @@ func TestOtelAPMIngestion(t *testing.T) { var fixtureWg sync.WaitGroup fixtureWg.Add(1) go func() { - fixture.RunOtelWithClient(ctx, false, false) + fixture.RunOtelWithClient(ctx) fixtureWg.Done() }() @@ -463,22 +426,6 @@ func TestOtelAPMIngestion(t *testing.T) { ) require.NoError(t, err, "APM not initialized") - // wait for otel collector to start - require.Eventuallyf(t, func() bool { - // This will return errors until it connects to the agent, - // they're mostly noise because until the agent starts running - // we will get connection errors. If the test fails - // the agent logs will be present in the error message - // which should help to explain why the agent was not - // healthy. - err = fixture.IsHealthy(ctx) - return err == nil - }, - 2*time.Minute, time.Second, - "Elastic-Agent did not report healthy. Agent status error: \"%v\"", - err, - ) - require.NoError(t, os.WriteFile(filepath.Join(tempDir, fileName), []byte(apmProcessingContent), 0600)) // check index @@ -588,30 +535,3 @@ func mapAtLeastOneTrue(mm map[string]bool) bool { return false } - -func testAgentCanRun(ctx context.Context, t *testing.T, fixture *atesting.Fixture) func(*testing.T) { - tCtx, cancel := testcontext.WithDeadline(t, ctx, time.Now().Add(5*time.Minute)) - defer cancel() - - return func(t *testing.T) { - // Get path to Elastic Agent executable - devFixture, err := define.NewFixtureFromLocalBuild(t, define.Version()) - require.NoError(t, err) - - // Prepare the Elastic Agent so the binary is extracted and ready to use. - err = devFixture.Prepare(tCtx) - require.NoError(t, err) - - require.Eventually( - t, - func() bool { - return nil == devFixture.Run(tCtx, integrationtest.State{ - Configure: complexIsolatedUnitsConfig, - AgentState: integrationtest.NewClientState(client.Healthy), - }) - }, - 5*time.Minute, 500*time.Millisecond, - "Agent should not return error", - ) - } -}