Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[8.6](backport #2226) Start/stop components in a more synchronised manner #2233

Merged
merged 1 commit into from
Feb 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 65 additions & 14 deletions pkg/component/runtime/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ const (
maxCheckinMisses = 3
// diagnosticTimeout is the maximum amount of time to wait for a diagnostic response from a unit.
diagnosticTimeout = 20 * time.Second

// stopCheckRetryPeriod is a idle time between checks for component stopped state
stopCheckRetryPeriod = 200 * time.Millisecond
)

var (
Expand Down Expand Up @@ -646,40 +649,88 @@ func (m *Manager) update(components []component.Component, teardown bool) error
}

touched := make(map[string]bool)
newComponents := make([]component.Component, 0, len(components))
for _, comp := range components {
touched[comp.ID] = true
existing, ok := m.current[comp.ID]
if ok {
if existing, ok := m.current[comp.ID]; ok {
// existing component; send runtime updated value
existing.currComp = comp
if err := existing.runtime.Update(comp); err != nil {
return fmt.Errorf("failed to update component %s: %w", comp.ID, err)
}
} else {
// new component; create its runtime
logger := m.baseLogger.Named(fmt.Sprintf("component.runtime.%s", comp.ID))
state, err := newComponentRuntimeState(m, logger, m.monitor, comp)
if err != nil {
return fmt.Errorf("failed to create new component %s: %w", comp.ID, err)
}
m.current[comp.ID] = state
err = state.start()
if err != nil {
return fmt.Errorf("failed to start component %s: %w", comp.ID, err)
}
continue
}
newComponents = append(newComponents, comp)
}

var stoppedWg sync.WaitGroup
for id, existing := range m.current {
// skip if already touched (meaning it still existing)
if _, done := touched[id]; done {
continue
}
// component was removed (time to clean it up)
_ = existing.stop(teardown)
// stop is async, wait for operation to finish,
// otherwise new instance may be started and components
// may fight for resources (e.g ports, files, locks)
stoppedWg.Add(1)
go func(state *componentRuntimeState) {
m.waitForStopped(state)
stoppedWg.Done()
}(existing)
}

stoppedWg.Wait()

// start all not started
for _, comp := range newComponents {
// new component; create its runtime
logger := m.baseLogger.Named(fmt.Sprintf("component.runtime.%s", comp.ID))
state, err := newComponentRuntimeState(m, logger, m.monitor, comp)
if err != nil {
return fmt.Errorf("failed to create new component %s: %w", comp.ID, err)
}
m.current[comp.ID] = state
if err = state.start(); err != nil {
return fmt.Errorf("failed to start component %s: %w", comp.ID, err)
}
}

return nil
}

func (m *Manager) waitForStopped(comp *componentRuntimeState) {
if comp == nil {
return
}
compID := comp.currComp.ID
timeout := defaultStopTimeout
if comp.currComp.InputSpec != nil &&
comp.currComp.InputSpec.Spec.Service != nil &&
comp.currComp.InputSpec.Spec.Service.Operations.Uninstall != nil &&
comp.currComp.InputSpec.Spec.Service.Operations.Uninstall.Timeout > 0 {
// if component is a service and timeout is defined, use the one defined
timeout = comp.currComp.InputSpec.Spec.Service.Operations.Uninstall.Timeout
}

timeoutCh := time.After(timeout)
for {
if comp.latestState.State == client.UnitStateStopped {
return
}

if _, exists := m.current[compID]; !exists {
return
}

select {
case <-timeoutCh:
return
case <-time.After(stopCheckRetryPeriod):
}
}
}
func (m *Manager) shutdown() {
m.shuttingDown.Store(true)

Expand Down
232 changes: 227 additions & 5 deletions pkg/component/runtime/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1866,13 +1866,13 @@ func TestManager_FakeInput_MultiComponent(t *testing.T) {
return
case state := <-sub0.Ch():
t.Logf("component fake-0 state changed: %+v", state)
signalState(subErrCh0, &state)
signalState(subErrCh0, &state, []client.UnitState{client.UnitStateHealthy})
case state := <-sub1.Ch():
t.Logf("component fake-1 state changed: %+v", state)
signalState(subErrCh1, &state)
signalState(subErrCh1, &state, []client.UnitState{client.UnitStateHealthy})
case state := <-sub2.Ch():
t.Logf("component fake-2 state changed: %+v", state)
signalState(subErrCh2, &state)
signalState(subErrCh2, &state, []client.UnitState{client.UnitStateHealthy})
}
}
}()
Expand Down Expand Up @@ -2344,6 +2344,219 @@ LOOP:
require.NoError(t, err)
}

func TestManager_FakeInput_OutputChange(t *testing.T) {
testPaths(t)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

ai, _ := info.NewAgentInfo(true)
m, err := NewManager(newDebugLogger(t), newDebugLogger(t), "localhost:0", ai, apmtest.DiscardTracer, newTestMonitoringMgr(), configuration.DefaultGRPCConfig())
require.NoError(t, err)
errCh := make(chan error)
go func() {
err := m.Run(ctx)
if errors.Is(err, context.Canceled) {
err = nil
}
errCh <- err
}()

waitCtx, waitCancel := context.WithTimeout(ctx, 1*time.Second)
defer waitCancel()
if err := m.WaitForReady(waitCtx); err != nil {
require.NoError(t, err)
}

binaryPath := testBinary(t, "component")
runtimeSpec := component.InputRuntimeSpec{
InputType: "fake",
BinaryName: "",
BinaryPath: binaryPath,
Spec: fakeInputSpec,
}
components := []component.Component{
{
ID: "fake-0",
InputSpec: &runtimeSpec,
Units: []component.Unit{
{
ID: "fake-input-0-0",
Type: client.UnitTypeInput,
Config: component.MustExpectedConfig(map[string]interface{}{
"type": "fake",
"state": int(client.UnitStateHealthy),
"message": "Fake Healthy 0-0",
}),
},
{
ID: "fake-input-0-1",
Type: client.UnitTypeInput,
Config: component.MustExpectedConfig(map[string]interface{}{
"type": "fake",
"state": int(client.UnitStateHealthy),
"message": "Fake Healthy 0-1",
}),
},
{
ID: "fake-input-0-2",
Type: client.UnitTypeInput,
Config: component.MustExpectedConfig(map[string]interface{}{
"type": "fake",
"state": int(client.UnitStateHealthy),
"message": "Fake Healthy 0-1",
}),
},
},
},
}

components2 := []component.Component{
{
ID: "fake-1",
InputSpec: &runtimeSpec,
Units: []component.Unit{
{
ID: "fake-input-1-0",
Type: client.UnitTypeInput,
Config: component.MustExpectedConfig(map[string]interface{}{
"type": "fake",
"state": int(client.UnitStateHealthy),
"message": "Fake Healthy 0-0",
}),
},
{
ID: "fake-input-1-1",
Type: client.UnitTypeInput,
Config: component.MustExpectedConfig(map[string]interface{}{
"type": "fake",
"state": int(client.UnitStateHealthy),
"message": "Fake Healthy 0-1",
}),
},
{
ID: "fake-input-1-1",
Type: client.UnitTypeInput,
Config: component.MustExpectedConfig(map[string]interface{}{
"type": "fake",
"state": int(client.UnitStateHealthy),
"message": "Fake Healthy 0-1",
}),
},
},
},
}

type progressionStep struct {
componentID string
state ComponentState
}
stateProgression := make([]progressionStep, 0)

subCtx, subCancel := context.WithCancel(context.Background())
defer subCancel()
stateProgCh := make(chan progressionStep)
subErrCh0 := make(chan error)
subErrCh1 := make(chan error)
go func() {
sub0 := m.Subscribe(subCtx, "fake-0")
sub1 := m.Subscribe(subCtx, "fake-1")
for {
select {
case <-subCtx.Done():
close(stateProgCh)
return
case state := <-sub0.Ch():
t.Logf("component fake-0 state changed: %+v", state)
signalState(subErrCh0, &state, []client.UnitState{client.UnitStateHealthy, client.UnitStateStopped})
stateProgCh <- progressionStep{"fake-o", state}
case state := <-sub1.Ch():
t.Logf("component fake-1 state changed: %+v", state)
signalState(subErrCh1, &state, []client.UnitState{client.UnitStateHealthy})
stateProgCh <- progressionStep{"fake-o", state}
}
}
}()

go func() {
for step := range stateProgCh {
stateProgression = append(stateProgression, step)
}
}()

defer drainErrChan(errCh)
defer drainErrChan(subErrCh0)
defer drainErrChan(subErrCh1)

startTimer := time.NewTimer(100 * time.Millisecond)
defer startTimer.Stop()
select {
case <-startTimer.C:
err = m.Update(components)
require.NoError(t, err)
case err := <-errCh:
t.Fatalf("failed early: %s", err)
}

updateTimeout := 300 * time.Millisecond
if runtime.GOOS == windows {
// windows is slow, preventing flakyness
updateTimeout = 550 * time.Millisecond
}
updateTimer := time.NewTimer(updateTimeout)
defer updateTimer.Stop()
select {
case <-updateTimer.C:
err = m.Update(components2)
require.NoError(t, err)
case err := <-errCh:
t.Fatalf("failed early: %s", err)
}

count := 0
endTimer := time.NewTimer(30 * time.Second)
defer endTimer.Stop()
LOOP:
for {
select {
case <-endTimer.C:
t.Fatalf("timed out after 30 seconds")
case err := <-errCh:
require.NoError(t, err)
case err := <-subErrCh0:
require.NoError(t, err)
count++
if count >= 2 {
break LOOP
}
case err := <-subErrCh1:
require.NoError(t, err)
count++
if count >= 2 {
break LOOP
}
}
}

subCancel()
cancel()

// check progresstion, require stop fake-0 before start fake-1
wasStopped := false
for _, step := range stateProgression {
if step.componentID == "fake-0" && step.state.State == client.UnitStateStopped {
wasStopped = true
}
if step.componentID == "fake-1" && step.state.State == client.UnitStateStarting {
require.True(t, wasStopped)
break
}
}

err = <-errCh
require.NoError(t, err)
}

func newDebugLogger(t *testing.T) *logger.Logger {
t.Helper()

Expand All @@ -2366,7 +2579,7 @@ func drainErrChan(ch chan error) {
}
}

func signalState(subErrCh chan error, state *ComponentState) {
func signalState(subErrCh chan error, state *ComponentState, acceptableStates []client.UnitState) {
if state.State == client.UnitStateFailed {
subErrCh <- fmt.Errorf("component failed: %s", state.Message)
} else {
Expand All @@ -2375,7 +2588,7 @@ func signalState(subErrCh chan error, state *ComponentState) {
for key, unit := range state.Units {
if unit.State == client.UnitStateStarting {
// acceptable
} else if unit.State == client.UnitStateHealthy {
} else if isAcceptableState(unit.State, acceptableStates) {
healthy++
} else if issue == "" {
issue = fmt.Sprintf("unit %s in invalid state %v", key.UnitID, unit.State)
Expand All @@ -2390,6 +2603,15 @@ func signalState(subErrCh chan error, state *ComponentState) {
}
}

func isAcceptableState(state client.UnitState, acceptableStates []client.UnitState) bool {
for _, s := range acceptableStates {
if s == state {
return true
}
}
return false
}

func testPaths(t *testing.T) {
t.Helper()

Expand Down
2 changes: 1 addition & 1 deletion pkg/component/runtime/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ func (s *ServiceRuntime) stop(ctx context.Context, comm Communicator, lastChecki
}
}

s.log.Debug("uninstall %s service", name)
s.log.Debugf("uninstall %s service", name)
err := s.uninstall(ctx)
if err != nil {
s.log.Errorf("failed %s service uninstall, err: %v", name, err)
Expand Down