Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix possible blocking in the Coordinator and out-of-order state reporting in CoordinatorState #2736

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ func (c *Coordinator) State() state.State {
// results in the subscription being unsubscribed.
//
// Note: Not reading from a subscription channel will cause the Coordinator to block.
func (c *Coordinator) StateSubscribe(ctx context.Context) *state.StateSubscription {
func (c *Coordinator) StateSubscribe(ctx context.Context) state.StateSubscription {
return c.state.Subscribe(ctx)
}

Expand Down
59 changes: 59 additions & 0 deletions internal/pkg/agent/application/coordinator/coordinator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,65 @@ func TestCoordinator_StateSubscribe(t *testing.T) {
require.NoError(t, err)
}

func TestCoordinator_StateSubscribe_BlockedSubscriber(t *testing.T) {
// Test that state subscribers cannot block each other by creating idle
// subscribers that never read from their channel and confirming that the
// active subscriber still receives state updates.
coordCh := make(chan error)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

coord, _, _ := createCoordinator(t)
go func() {
err := coord.Run(ctx)
if errors.Is(err, context.Canceled) {
// allowed error
err = nil
}
coordCh <- err
}()

subCh := make(chan error)
go func() {
ctx, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()

// Create three anonymous subscribers that will never be read, and one
// active subscriber that will. This is to test that congestion in some
// subscribers will not block state updates to others.
coord.StateSubscribe(ctx)
coord.StateSubscribe(ctx)
coord.StateSubscribe(ctx)
activeSub := coord.StateSubscribe(ctx)
stateChangeCount := 0
for {
select {
case <-ctx.Done():
subCh <- ctx.Err()
return
case <-activeSub.Ch():
stateChangeCount++
if stateChangeCount >= 10 {
// We have received 10 state updates even though one subscriber is
// not listening at all, return success
subCh <- nil
} else {
// The error type here isn't important, this is just an easy way
// to trigger a state update notification.
coord.state.SetConfigManagerError(nil)
}
}
}
}()

err := <-subCh
require.NoError(t, err)
cancel()

err = <-coordCh
require.NoError(t, err)
}

func TestCoordinatorWithErrors(t *testing.T) {
handlerChan, runtime, varWatcher, config := setupAndWaitCoordinatorDone()

Expand Down
208 changes: 145 additions & 63 deletions internal/pkg/agent/application/coordinator/state/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ package state

import (
"context"
"reflect"
"sync"
"time"

agentclient "github.com/elastic/elastic-agent/pkg/control/v2/client"

Expand Down Expand Up @@ -50,8 +50,13 @@ type CoordinatorState struct {
actionsErr error
varsMgrErr error

subMx sync.RWMutex
subscribe []*StateSubscription
// StateSubscriptions sent to subscribeChan will receive state updates from
// stateReporter until their context is cancelled.
subscribeChan chan StateSubscription

// (*CoordinatorState).changed sends on this channel to notify stateReporter
// when the state changes.
stateChangedChan chan struct{}
}

type coordinatorOverrideState struct {
Expand All @@ -71,13 +76,24 @@ type stateSetterOpt func(ss *stateSetter)

// NewCoordinatorState creates the coordinator state manager.
func NewCoordinatorState(state agentclient.State, msg string, fleetState agentclient.State, fleetMsg string, logLevel logp.Level) *CoordinatorState {
return &CoordinatorState{
cs := &CoordinatorState{
state: state,
message: msg,
fleetState: fleetState,
fleetMessage: fleetMsg,
logLevel: logLevel,

// subscribeChan is synchronous: once Subscribe returns, the caller is
// guaranteed to be included in future state change notifications.
subscribeChan: make(chan StateSubscription),

// stateChangedChan is asynchronous with buffer size 1: this guarantees
// that state changes will propagate but multiple simultaneous changes
// will not accumulate.
stateChangedChan: make(chan struct{}, 1),
}
go cs.stateReporter()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What cleans up this go routine? We should ensure that when the coordinator is stopped that this go routine is also stopped.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See the other comments with @cmacknz -- the previous version left many goroutines that weren't cleaned up, so I thought reducing that to one would still be acceptable since CoordinatorState probably exists for the lifetime of the Coordinator. If that's not the case, what should determine the lifetime here?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is true for the life of the Elastic Agent, but is not true for testing. I believe we should clean this up properly. That previous comment said that the goroutines that you are speaking of where cleaned up when the context was cancelled, so they where cleaned up by the caller no?

return cs
}

// UpdateState updates the state triggering a change notification to subscribers.
Expand Down Expand Up @@ -192,20 +208,26 @@ func (cs *CoordinatorState) UpdateComponentState(state runtime.ComponentComponen

// State returns the current state for the coordinator.
func (cs *CoordinatorState) State() (s State) {
// We need to claim all three mutexes simultaneously, otherwise we may
// collect inconsistent states from the different components if one of them
// changes during this function call.
Copy link
Member

@cmacknz cmacknz May 29, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks more correct, but I am trying to decide whether the number of mutexes we need to grab here is a code smell.

Given the number of locks we have and the number of deadlock bugs we keep having, I think our concurrency patterns can probably be improved but I don't have any quick fixes to suggest.

Thoughts?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just brainstorming here, I could be waaay off 😅
Aside from locking the whole coordinator state (with a mutex on the whole object, à la synchronized in Java), cloning it, unlocking the mutex on the original and then start processing not much comes to mind...
The same could be achieved with a value receiver, however:

  • we pay the cost of copying the whole CoordinatorState every time we call
  • we have mutexes and pointers within the struct so I am not sure that could work...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do think the number of mutexes is a code smell -- there can be reasons for it but it's a hazard, especially when the assumptions aren't made explicit (e.g. functions that can/should be called with a mutex, or that can claim a mutex, should document that, and mutexes should document who uses them and why, otherwise it can give the impression of synchronization without consistent results).

Looking again at the ways the different mutexes are used, I see another potential concern: the way compStatesMx is used in UpdateComponentState in two stages with two calls to cs.changed() suggests that cs.changed() is being used intentionally to queue multiple changed states, with the expectation that a unit entering "stopped" state will produce two separate events for subscribers, one with the state changed to "stopped" and the second with the unit removed entirely.

The problem is, the new code doesn't guarantee that those will be sent to all subscribers as discrete states -- it intentionally sends subscribers only the most recent state -- but neither did the previous code, since states could be dropped or delivered out-of-order. Requiring subscribers to receive a distinct new state corresponding to every call to cs.changed() (instead of just the most recent update whenever they are able to receive one) would require much more careful handling, e.g. maintaining separate queues for each client. I think that would be so troublesome that we should prefer not to attempt it unless there are very strong reasons why it's needed (maybe @blakerouse knows more about the requirements?) -- otherwise maybe we should just switch to a single mutex, which would simplify UpdateComponentState and the overall synchronization pattern and would probably not cause bottleneck issues since none of the mutex-protected updates seem to require more than an array traversal.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Related: if we do care about transmitting any particular state, rather than just "whatever is current," then cs.State() should never have acquired those mutexes at all, they should already be held by the caller. Otherwise, as soon as the caller releases them, there could be another change before cs.change() takes effect, dropping the intended state.)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We want the subscriber to get all states in the order they occurred without missing a state. I believe without that the state machine code that is used to resolve wether a state has been reached will be missed and cause issues with the pkg/testing.NewFixture.

So I do believe we will need to create a queue per subscriber.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok -- then this probably needs to be escalated because this has serious implications for how different components talk to each other. Who are the stakeholders for this? Making this kind of guarantee is expensive and we should pin down the specific requirements -- this requirement means there are dramatically more state leaks in the baseline code than it seemed like, since none of these calls generated their reported state while still holding the mutex they used to change it. I think we would get dramatically better reliability for our effort if we could find a way to avoid this, but if we need to do it then let's figure out how best to limit the scope.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We want the subscriber to get all states in the order they occurred without missing a state.

This is the ideal, but reading this PR I'm not convinced we have correctly achieved it. Under low load with few state transitions we may have, but we have no tests to guarantee that states are always observed correctly for all possible state, goroutine, or lock orderings. This PR is showing that if we had those, they would fail. I suspect this type of locking bug also exists in other places.

This PR exchanges a random chance of reading states out of order or not at all with a deterministic guarantee that an observer always reads the current state at the time it is ready to read the state again. I think this is strictly an improvement. If the rate of state changes is reasonable, and the watcher is never blocked for significant amounts of time this likely works just fine and can never block the coordinator.

There are probably ways to improve this further, one obvious one mentioned previously is to add a queue per subscriber but this isn't entirely straightforward either. Unless we allow a queue with unbounded growth a slow subscriber could still block the coordinator.

I think our concurrency patterns can be improved, here and elsewhere. I don't want to turn this comment in a large debate, so I am going to add this PR to the agenda of the next agent core meeting and invite Fae. I know she has some good ideas for other ways to approach this, and is also going to be evaluating the race detector failures in #2743 which might give us some other hints.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We want the subscriber to get all states in the order they occurred without missing a state.

This is the ideal, but reading this PR I'm not convinced we have correctly achieved it. Under low load with few state transitions we may have, but we have no tests to guarantee that states are always observed correctly for all possible state, goroutine, or lock orderings. This PR is showing that if we had those, they would fail. I suspect this type of locking bug also exists in other places.

That is not true. It was achieved as it would actually block if it couldn't deliver the state. Please show how it was not actually achieved? Even if that was the case the state machine code still required it, and would have cause a bug and resulted in failed tests.

This PR exchanges a random chance of reading states out of order or not at all with a deterministic guarantee that an observer always reads the current state at the time it is ready to read the state again. I think this is strictly an improvement. If the rate of state changes is reasonable, and the watcher is never blocked for significant amounts of time this likely works just fine and can never block the coordinator.

True if the reader of the channel is reading fast enough it won't matter, but this change results in the chances of that happening to be much larger. It now relies on the golang runtime to schedule often enough to not miss, other implementation did not have that problem as it blocked for each state change.

There are probably ways to improve this further, one obvious one mentioned previously is to add a queue per subscriber but this isn't entirely straightforward either. Unless we allow a queue with unbounded growth a slow subscriber could still block the coordinator.

As I said in my comment, using a ring buffer with a limit would result this being non-blocking and always received in the correct order. Only in the case that the reader was really behind would it then miss state changes, once the ring buffer cycles. This is not complicated to implement or understand.

I think our concurrency patterns can be improved, here and elsewhere. I don't want to turn this comment in a large debate, so I am going to add this PR to the agenda of the next agent core meeting and invite Fae. I know she has some good ideas for other ways to approach this, and is also going to be evaluating the race detector failures in #2743 which might give us some other hints.

Sounds good to me.

cs.mx.RLock()
cs.compStatesMx.RLock()
cs.mgrMx.RLock()
defer cs.mx.RUnlock()
defer cs.compStatesMx.RUnlock()
defer cs.mgrMx.RUnlock()

s.State = cs.state
s.Message = cs.message
s.FleetState = cs.fleetState
s.FleetMessage = cs.fleetMessage
s.LogLevel = cs.logLevel
overrideState := cs.overrideState
cs.mx.RUnlock()

// copy component states for PIT
cs.compStatesMx.RLock()
compStates := make([]runtime.ComponentComponentState, len(cs.compStates))
copy(compStates, cs.compStates)
cs.compStatesMx.RUnlock()
s.Components = compStates

if overrideState != nil {
Expand All @@ -217,8 +239,6 @@ func (cs *CoordinatorState) State() (s State) {
// or
// coordinator overall is reported is healthy; in the case any component or unit is not healthy then we report
// as degraded because we are not fully healthy
cs.mgrMx.RLock()
defer cs.mgrMx.RUnlock()
if cs.runtimeMgrErr != nil {
s.State = agentclient.Failed
s.Message = cs.runtimeMgrErr.Error()
Expand Down Expand Up @@ -246,79 +266,141 @@ func (cs *CoordinatorState) State() (s State) {
//
// This provides the current state at the time of first subscription. Cancelling the context
// results in the subscription being unsubscribed.
//
// Note: Not reading from a subscription channel will cause the Coordinator to block.
func (cs *CoordinatorState) Subscribe(ctx context.Context) *StateSubscription {
sub := newStateSubscription(ctx, cs)

// send initial state
state := cs.State()
go func() {
select {
case <-ctx.Done():
return
case sub.ch <- state:
}
}()

// add subscription for future changes
cs.subMx.Lock()
cs.subscribe = append(cs.subscribe, sub)
cs.subMx.Unlock()

go func() {
<-ctx.Done()

// unsubscribe
cs.subMx.Lock()
defer cs.subMx.Unlock()
for i, s := range cs.subscribe {
if sub == s {
cs.subscribe = append(cs.subscribe[:i], cs.subscribe[i+1:]...)
return
}
}
}()

func (cs *CoordinatorState) Subscribe(ctx context.Context) StateSubscription {
sub := newStateSubscription(ctx)
cs.subscribeChan <- sub
return sub
}

func (cs *CoordinatorState) changed() {
cs.sendState(cs.State())
// Try to send to stateChangedChan but don't block -- if its buffer is full
// then an update is already pending, and the changes we're reporting will
// be included.
select {
case cs.stateChangedChan <- struct{}{}:
default:
}
}

func (cs *CoordinatorState) sendState(state State) {
cs.subMx.RLock()
defer cs.subMx.RUnlock()
func (cs *CoordinatorState) stateReporter() {
var subscribers []StateSubscription
// We support a variable number of subscribers and we don't want any of them
// to block each other or the CoordinatorState as a whole, so we need to
// listen with reflect.Select, which selects on an array of cases.
// Unfortunately this means we need to track the active select cases
// ourselves, including their position in the array.
//
// The ordering we use is:
// - first, the listener on subscribeChan
// - second, the listener on stateChangedChan
// - after that, two cases for each subscriber, in the same order as the
// subscribers array: first its done channel, then its listener channel.
//
// All subscribers are included in the array of select cases even when some
// have already been updated, that way we don't need to worry about the
// order changing. Instead, subscribers that have already been updated have
// the listener channel of their select case set to nil.
selectCases := []reflect.SelectCase{
{
Dir: reflect.SelectRecv,
Chan: reflect.ValueOf(cs.subscribeChan),
},
{
Dir: reflect.SelectRecv,
Chan: reflect.ValueOf(cs.stateChangedChan),
},
}
const newSubscriberIndex = 0
const stateChangedIndex = 1
const firstSubscriberIndex = 2

currentState := cs.State()

// resetListeners is called when a state change notification arrives, to
// reactivate the listener channels of all subscribers and update their
// select case to the new state value.
resetListeners := func() {
currentState = cs.State()
for i, subscriber := range subscribers {
listenerIndex := firstSubscriberIndex + 2*i + 1
selectCases[listenerIndex].Chan = reflect.ValueOf(subscriber.ch)
selectCases[listenerIndex].Send = reflect.ValueOf(currentState)
}
}

// addSubscriber is a helper to add a new subscriber and its select
// cases to our lists.
addSubscriber := func(subscriber StateSubscription) {
subscribers = append(subscribers, subscriber)
selectCases = append(selectCases,
// Add a select case receiving from the subscriber's done channel
reflect.SelectCase{
Dir: reflect.SelectRecv,
Chan: reflect.ValueOf(subscriber.ctx.Done()),
},
// Add a select case sending to the subscriber's listener channel
reflect.SelectCase{
Dir: reflect.SelectSend,
Chan: reflect.ValueOf(subscriber.ch),
Send: reflect.ValueOf(currentState),
})
}

send := func(sub *StateSubscription) {
t := time.NewTimer(time.Second)
defer t.Stop()
for {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As far as I can tell, the lifetime of the coordinator is the same as the lifetime as the agent process so this never exiting is likely fine. Is that understanding correct?

Seems like the entrypoint that would create this follows from

coord := coordinator.New(log, cfg, logLevel, agentInfo, specs, reexec, upgrader, runtime, configMgr, composable, caps, monitor, isManaged, compModifiers...)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that's my understanding as well. I figured it was ok to leave this dangling since the previous version did the same with its own helper goroutines (every listener previously spawned a separate goroutine that did nothing but wait for that listener's context to end, which is no longer needed in this version), and there's no obvious context bounding CoordinatorState's lifecycle.

// Always try a standalone receive on the state changed channel first, so
// it gets priority over other updates.
select {
case <-sub.ctx.Done():
case sub.ch <- state:
case <-t.C:
// subscriber didn't read from the channel after 1 second; so we unblock
case <-cs.stateChangedChan:
resetListeners()
default:
}
}

for _, sub := range cs.subscribe {
send(sub)
chosen, value, _ := reflect.Select(selectCases)
if chosen == stateChangedIndex {
resetListeners()
} else if chosen == newSubscriberIndex {
subscriber, ok := value.Interface().(StateSubscription)
if ok {
addSubscriber(subscriber)
}
} else {
subscriberIndex := (chosen - firstSubscriberIndex) / 2
if (chosen-firstSubscriberIndex)%2 == 0 {
// The subscriber's done channel has been closed, remove
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we have a test where a subscriber is removed.

I went to measure the test coverage of this code manually and the Go tooling won't give me a test report for this package because the state implementation is alone in its own package. We should just move this into the coordinator package or move the tests into the state package to make this possible.

Copy link
Contributor

@blakerouse blakerouse Jun 5, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was moved into its own package to ensure that the coordinator only interfaces with the state management using the public interface. This is to ensure that locks and other pieces of the state are not touched directly by the coordinator.

// them from our lists
subscribers = append(
subscribers[:subscriberIndex],
subscribers[subscriberIndex+1:]...)
selectCases = append(
selectCases[:chosen],
selectCases[chosen+2:]...)
} else {
// We successfully sent a state update to this subscriber, turn off
// its listener channel until we receive a new state change.
selectCases[chosen].Chan = reflect.ValueOf(nil)
}
}
}
}

// StateSubscription provides a channel for notifications of state changes.
type StateSubscription struct {
// When this context expires the subscription will be cancelled by
// CoordinatorState.stateReporter.
ctx context.Context
cs *CoordinatorState
ch chan State

// When the state changes, the new state will be sent to this channel.
// If multiple state changes accumulate before the receiver reads from this
// channel, then only the most recent one will be sent.
ch chan State
}

func newStateSubscription(ctx context.Context, cs *CoordinatorState) *StateSubscription {
return &StateSubscription{
func newStateSubscription(ctx context.Context) StateSubscription {
return StateSubscription{
ctx: ctx,
cs: cs,
ch: make(chan State),
// The subscriber channel is unbuffered so it always gets the most recent
// state at the time it receives from the channel.
ch: make(chan State),
}
}

Expand Down