From a5262e16596c8c6f958b7a038acc54e273635ff9 Mon Sep 17 00:00:00 2001 From: Anderson Queiroz Date: Thu, 22 Feb 2024 18:14:55 +0100 Subject: [PATCH] add debug log an fix flaky test (#4290) - add debug logs on Manager.update for completeness - Refactor and rename TestManager_OutputChange - use eventually instead of time.Sleep - rename to better reflect what it tests - Remove TestManager_StartStop as TestManager_StartStopComponent already covers it --- pkg/component/runtime/manager.go | 52 +-- .../runtime/manager_fake_input_test.go | 306 ++++-------------- 2 files changed, 96 insertions(+), 262 deletions(-) diff --git a/pkg/component/runtime/manager.go b/pkg/component/runtime/manager.go index a3731673b3e..6109607b46f 100644 --- a/pkg/component/runtime/manager.go +++ b/pkg/component/runtime/manager.go @@ -772,24 +772,27 @@ func (m *Manager) update(model component.Model, teardown bool) error { stop = append(stop, existing) } m.currentMx.RUnlock() - if len(stop) > 0 { - var stoppedWg sync.WaitGroup - stoppedWg.Add(len(stop)) - for _, existing := range stop { - m.logger.Debugf("Stopping component %q", existing.id) - _ = existing.stop(teardown, model.Signed) - // stop is async, wait for operation to finish, - // otherwise new instance may be started and components - // may fight for resources (e.g ports, files, locks) - go func(state *componentRuntimeState) { - m.waitForStopped(state) - stoppedWg.Done() - }(existing) - } - stoppedWg.Wait() + + var stoppedWg sync.WaitGroup + stoppedWg.Add(len(stop)) + for _, existing := range stop { + m.logger.Debugf("Stopping component %q", existing.id) + _ = existing.stop(teardown, model.Signed) + // stop is async, wait for operation to finish, + // otherwise new instance may be started and components + // may fight for resources (e.g. ports, files, locks) + go func(state *componentRuntimeState) { + err := m.waitForStopped(state) + if err != nil { + m.logger.Errorf("updating components: failed waiting %s stop", + state.id) + } + stoppedWg.Done() + }(existing) } + stoppedWg.Wait() - // start all not started + // start new components for _, comp := range newComponents { // new component; create its runtime logger := m.baseLogger.Named(fmt.Sprintf("component.runtime.%s", comp.ID)) @@ -800,6 +803,7 @@ func (m *Manager) update(model component.Model, teardown bool) error { m.currentMx.Lock() m.current[comp.ID] = state m.currentMx.Unlock() + m.logger.Debugf("Starting component %q", comp.ID) if err = state.start(); err != nil { return fmt.Errorf("failed to start component %s: %w", comp.ID, err) } @@ -808,10 +812,11 @@ func (m *Manager) update(model component.Model, teardown bool) error { return nil } -func (m *Manager) waitForStopped(comp *componentRuntimeState) { +func (m *Manager) waitForStopped(comp *componentRuntimeState) error { if comp == nil { - return + return nil } + currComp := comp.getCurrent() compID := currComp.ID timeout := defaultStopTimeout @@ -828,20 +833,23 @@ func (m *Manager) waitForStopped(comp *componentRuntimeState) { latestState := comp.getLatest() if latestState.State == client.UnitStateStopped { m.logger.Debugf("component %q stopped.", compID) - return + return nil } + // it might happen the component stop signal isn't received but the + // manager detects it stopped running. Then the manager removes it from + // its list of current components. Therefore, we also need to check if + // the component was removed, if it was, we consider it stopped. m.currentMx.RLock() if _, exists := m.current[compID]; !exists { m.currentMx.RUnlock() - return + return nil } m.currentMx.RUnlock() select { case <-timeoutCh: - m.logger.Errorf("timeout exceeded waiting for component %q to stop", compID) - return + return fmt.Errorf("timeout exceeded after %s", timeout) case <-time.After(stopCheckRetryPeriod): } } diff --git a/pkg/component/runtime/manager_fake_input_test.go b/pkg/component/runtime/manager_fake_input_test.go index 8b0931def51..2471be6bd5b 100644 --- a/pkg/component/runtime/manager_fake_input_test.go +++ b/pkg/component/runtime/manager_fake_input_test.go @@ -25,6 +25,7 @@ import ( gproto "google.golang.org/protobuf/proto" fakecmp "github.com/elastic/elastic-agent/pkg/component/fake/component/comp" + "github.com/elastic/elastic-agent/pkg/core/logger" "github.com/gofrs/uuid" "github.com/stretchr/testify/assert" @@ -98,127 +99,6 @@ func (suite *FakeInputSuite) setupTestPaths() { paths.SetVersionHome(false) } -func (suite *FakeInputSuite) TestManager_StartStop() { - t := suite.T() - - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - ai := &info.AgentInfo{} - m, err := NewManager(newDebugLogger(t), newDebugLogger(t), "localhost:0", ai, apmtest.DiscardTracer, newTestMonitoringMgr(), configuration.DefaultGRPCConfig()) - require.NoError(t, err) - errCh := make(chan error) - go func() { - err := m.Run(ctx) - if errors.Is(err, context.Canceled) { - err = nil - } - errCh <- err - }() - - waitCtx, waitCancel := context.WithTimeout(ctx, 1*time.Second) - defer waitCancel() - if err := waitForReady(waitCtx, m); err != nil { - require.NoError(t, err) - } - - binaryPath := testBinary(t, "component") - comp := component.Component{ - ID: "fake-default", - InputSpec: &component.InputRuntimeSpec{ - InputType: "fake", - BinaryName: "", - BinaryPath: binaryPath, - Spec: fakeInputSpec, - }, - Units: []component.Unit{ - { - ID: "fake-input", - Type: client.UnitTypeInput, - LogLevel: client.UnitLogLevelTrace, - Config: component.MustExpectedConfig(map[string]interface{}{ - "type": "fake", - "state": int(client.UnitStateHealthy), - "message": "Fake Healthy", - }), - }, - }, - } - - subCtx, subCancel := context.WithCancel(context.Background()) - defer subCancel() - subErrCh := make(chan error) - go func() { - sub := m.Subscribe(subCtx, "fake-default") - for { - select { - case <-subCtx.Done(): - return - case state := <-sub.Ch(): - t.Logf("component state changed: %+v", state) - if state.State == client.UnitStateFailed { - subErrCh <- fmt.Errorf("component failed: %s", state.Message) - } else { - unit, ok := state.Units[ComponentUnitKey{UnitType: client.UnitTypeInput, UnitID: "fake-input"}] - if ok { - if unit.State == client.UnitStateFailed { - subErrCh <- fmt.Errorf("unit failed: %s", unit.Message) - } else if unit.State == client.UnitStateHealthy { - // remove the component which will stop it - m.Update(component.Model{Components: []component.Component{}}) - err := <-m.errCh - if err != nil { - subErrCh <- err - } - } else if unit.State == client.UnitStateStopped { - subErrCh <- nil - } else if unit.State == client.UnitStateStarting { - // acceptable - } else { - // unknown state that should not have occurred - subErrCh <- fmt.Errorf("unit reported unexpected state: %v", unit.State) - } - } else { - subErrCh <- errors.New("unit missing: fake-input") - } - } - } - } - }() - - defer drainErrChan(errCh) - defer drainErrChan(subErrCh) - - m.Update(component.Model{Components: []component.Component{comp}}) - err = <-m.errCh - require.NoError(t, err) - - endTimer := time.NewTimer(30 * time.Second) - defer endTimer.Stop() -LOOP: - for { - select { - case <-endTimer.C: - t.Fatalf("timed out after 30 seconds") - case err := <-errCh: - require.NoError(t, err) - case err := <-subErrCh: - require.NoError(t, err) - break LOOP - } - } - - subCancel() - cancel() - - err = <-errCh - require.NoError(t, err) - - workDir := filepath.Join(paths.Run(), comp.ID) - _, err = os.Stat(workDir) - require.ErrorIs(t, err, os.ErrNotExist) -} - func (suite *FakeInputSuite) TestManager_Features() { t := suite.T() @@ -3134,15 +3014,16 @@ LOOP: require.NoError(t, err) } -func (suite *FakeInputSuite) TestManager_OutputChange() { +func (suite *FakeInputSuite) TestManager_StartStopComponent() { t := suite.T() ctx, cancel := context.WithCancel(context.Background()) defer cancel() + log, logs := logger.NewTesting("TestManager_StartStopComponent") ai := &info.AgentInfo{} m, err := NewManager( - newDebugLogger(t), + log, newDebugLogger(t), "localhost:0", ai, @@ -3151,14 +3032,14 @@ func (suite *FakeInputSuite) TestManager_OutputChange() { configuration.DefaultGRPCConfig()) require.NoError(t, err, "could not crete new manager") - errCh := make(chan error) - t.Cleanup(func() { drainErrChan(errCh) }) + managerErrCh := make(chan error) go func() { + defer close(managerErrCh) err := m.Run(ctx) if errors.Is(err, context.Canceled) { err = nil } - errCh <- err + managerErrCh <- err }() waitCtx, waitCancel := context.WithTimeout(ctx, 1*time.Second) @@ -3174,12 +3055,12 @@ func (suite *FakeInputSuite) TestManager_OutputChange() { BinaryPath: binaryPath, Spec: fakeInputSpec, } - const IDComp0 = "fake-0" - const IDComp1 = "fake-1" + const comp0ID = "fake-0" + const comp1ID = "fake-1" components := []component.Component{ { - ID: IDComp0, + ID: comp0ID, InputSpec: &runtimeSpec, Units: []component.Unit{ { @@ -3215,7 +3096,7 @@ func (suite *FakeInputSuite) TestManager_OutputChange() { components2 := []component.Component{ { - ID: IDComp1, + ID: comp1ID, InputSpec: &runtimeSpec, Units: []component.Unit{ { @@ -3249,131 +3130,76 @@ func (suite *FakeInputSuite) TestManager_OutputChange() { }, } - type progressionStep struct { - componentID string - state ComponentState - } - var stateProgression []progressionStep - - subCtx, subCancel := context.WithCancel(context.Background()) - defer subCancel() - - stateProgressionCh := make(chan progressionStep) - subErrCh0 := make(chan error) - subErrCh1 := make(chan error) - t.Cleanup(func() { drainErrChan(subErrCh0) }) - t.Cleanup(func() { drainErrChan(subErrCh1) }) - - go func() { - sub0 := m.Subscribe(subCtx, IDComp0) - sub1 := m.Subscribe(subCtx, IDComp1) - for { - select { - case <-subCtx.Done(): - close(stateProgressionCh) - return - case state := <-sub0.Ch(): - t.Logf("component %s state changed: %+v", IDComp0, state) - signalState( - subErrCh0, - &state, - []client.UnitState{client.UnitStateHealthy, client.UnitStateStopped}) - stateProgressionCh <- progressionStep{IDComp0, state} - - case state := <-sub1.Ch(): - t.Logf("component %s state changed: %+v", IDComp1, state) - signalState( - subErrCh1, - &state, - []client.UnitState{client.UnitStateHealthy}) - stateProgressionCh <- progressionStep{IDComp1, state} - } - } - }() - - var stateProgressionWG sync.WaitGroup - stateProgressionWG.Add(1) - go func() { - for step := range stateProgressionCh { - stateProgression = append(stateProgression, step) - } - stateProgressionWG.Done() - }() - - err = waitForReady(waitCtx, m) - require.NoError(t, err, "Manager must finish initializing") - select { - case err := <-errCh: - t.Fatalf("failed early: %s", err) + case err := <-managerErrCh: + require.NoError(t, err, + "Manager.Run returned and error before 1st component update") default: } - time.Sleep(100 * time.Millisecond) m.Update(component.Model{Components: components}) err = <-m.errCh - require.NoError(t, err) + require.NoError(t, err, "expected no error from the manager when applying"+ + "the 1st component model") + + // Wait the 1st config to be applied and the comp0ID to start + require.Eventuallyf(t, + func() bool { + filtered := logs.FilterMessageSnippet( + fmt.Sprintf("Starting component %q", comp0ID)). + TakeAll() + return len(filtered) > 0 + }, + 30*time.Second, + 200*time.Millisecond, + "component %s did not start", comp0ID) - updateSleep := 300 * time.Millisecond - if runtime.GOOS == component.Windows { - // windows is slow, preventing flakiness - updateSleep = time.Second - } - time.Sleep(updateSleep) m.Update(component.Model{Components: components2}) err = <-m.errCh - require.NoError(t, err) + require.NoError(t, err, "expected no error from the manager when applying"+ + "the 2nd component model") + + // Wait the 2nd config to be applied and the comp1ID to start + require.Eventuallyf(t, + func() bool { + filtered := logs.FilterMessageSnippet( + fmt.Sprintf("Starting component %q", comp1ID)). + TakeAll() + return len(filtered) > 0 + }, + 30*time.Second, + 200*time.Millisecond, + "component %s did not start", comp1ID) - count := 0 - timeout := 30 * time.Second - endTimer := time.NewTimer(timeout) - defer endTimer.Stop() + // component 1 started, we can stop the manager + cancel() -LOOP: - for { - select { - case <-endTimer.C: - t.Fatalf("timed out after %s seconds, "+ - "did not receive enought state changes", timeout) - case err := <-errCh: - require.NoError(t, err) - case err := <-subErrCh0: - t.Logf("[subErrCh0] received: %v", err) - require.NoError(t, err) - count++ - if count >= 2 { - break LOOP - } - case err := <-subErrCh1: - t.Logf("[subErrCh1] received: %v", err) - require.NoError(t, err) - count++ - if count >= 2 { - break LOOP - } - } - } + comp0StartLogs := logs.FilterMessageSnippet( + fmt.Sprintf("Starting component %q", comp0ID)).TakeAll() + comp0StopLogs := logs.FilterMessageSnippet( + fmt.Sprintf("Stopping component %q", comp0ID)).TakeAll() + comp1StartLogs := logs.FilterMessageSnippet( + fmt.Sprintf("Starting component %q", comp1ID)).TakeAll() - subCancel() - cancel() + assert.Len(t, comp0StartLogs, 1, + "component %d started more than once", comp0ID) + assert.Len(t, comp0StopLogs, 1, + "component %d stopped more than once", comp0ID) + assert.Len(t, comp1StartLogs, 1, + "component %d started more than once", comp1ID) - // check progression, require stop fake-0 before start fake-1 - stateProgressionWG.Wait() - comp0Stopped := false - for _, step := range stateProgression { - if step.componentID == IDComp0 && - step.state.State == client.UnitStateStopped { - comp0Stopped = true - } - if step.componentID == IDComp1 && - step.state.State == client.UnitStateStarting { - require.True(t, comp0Stopped) - break + assert.Truef(t, comp0StopLogs[0].Time.Before(comp1StartLogs[0].Time), + "component %s stopped after %s", comp0ID, comp1ID) + + err = <-managerErrCh + assert.NoError(t, err, "Manager.Run returned and error") + + if t.Failed() { + t.Logf("manager logs:") + for _, l := range logs.TakeAll() { + t.Log(l) } } - - err = <-errCh - require.NoError(t, err) } func (suite *FakeInputSuite) TestManager_Chunk() {