Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Flaky Test]: fix TestFakeInputSuite/TestManager_OutputChange #4290

Merged
52 changes: 30 additions & 22 deletions pkg/component/runtime/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -772,24 +772,27 @@ func (m *Manager) update(model component.Model, teardown bool) error {
stop = append(stop, existing)
}
m.currentMx.RUnlock()
if len(stop) > 0 {
var stoppedWg sync.WaitGroup
stoppedWg.Add(len(stop))
for _, existing := range stop {
m.logger.Debugf("Stopping component %q", existing.id)
_ = existing.stop(teardown, model.Signed)
// stop is async, wait for operation to finish,
// otherwise new instance may be started and components
// may fight for resources (e.g ports, files, locks)
go func(state *componentRuntimeState) {
m.waitForStopped(state)
stoppedWg.Done()
}(existing)
}
stoppedWg.Wait()

var stoppedWg sync.WaitGroup
stoppedWg.Add(len(stop))
for _, existing := range stop {
m.logger.Debugf("Stopping component %q", existing.id)
_ = existing.stop(teardown, model.Signed)
// stop is async, wait for operation to finish,
// otherwise new instance may be started and components
// may fight for resources (e.g. ports, files, locks)
go func(state *componentRuntimeState) {
err := m.waitForStopped(state)
if err != nil {
m.logger.Errorf("updating components: failed waiting %s stop",
state.id)
}
stoppedWg.Done()
}(existing)
}
stoppedWg.Wait()

// start all not started
// start new components
for _, comp := range newComponents {
// new component; create its runtime
logger := m.baseLogger.Named(fmt.Sprintf("component.runtime.%s", comp.ID))
Expand All @@ -800,6 +803,7 @@ func (m *Manager) update(model component.Model, teardown bool) error {
m.currentMx.Lock()
m.current[comp.ID] = state
m.currentMx.Unlock()
m.logger.Debugf("Starting component %q", comp.ID)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added for completeness as there is a log for stopping components. There makes sense to have for starting and updating components as well

if err = state.start(); err != nil {
return fmt.Errorf("failed to start component %s: %w", comp.ID, err)
}
Expand All @@ -808,10 +812,11 @@ func (m *Manager) update(model component.Model, teardown bool) error {
return nil
}

func (m *Manager) waitForStopped(comp *componentRuntimeState) {
func (m *Manager) waitForStopped(comp *componentRuntimeState) error {
if comp == nil {
return
return nil
}

currComp := comp.getCurrent()
compID := currComp.ID
timeout := defaultStopTimeout
Expand All @@ -828,20 +833,23 @@ func (m *Manager) waitForStopped(comp *componentRuntimeState) {
latestState := comp.getLatest()
if latestState.State == client.UnitStateStopped {
m.logger.Debugf("component %q stopped.", compID)
return
return nil
}

// it might happen the component stop signal isn't received but the
// manager detects it stopped running. Then the manager removes it from
// its list of current components. Therefore, we also need to check if
// the component was removed, if it was, we consider it stopped.
m.currentMx.RLock()
if _, exists := m.current[compID]; !exists {
m.currentMx.RUnlock()
return
return nil
}
m.currentMx.RUnlock()

select {
case <-timeoutCh:
m.logger.Errorf("timeout exceeded waiting for component %q to stop", compID)
return
return fmt.Errorf("timeout exceeded after %s", timeout)
case <-time.After(stopCheckRetryPeriod):
}
}
Expand Down
Loading
Loading