Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix possible blocking in the Coordinator and out-of-order state reporting in CoordinatorState #2736

Closed
wants to merge 6 commits into from

Conversation

faec
Copy link
Contributor

@faec faec commented May 26, 2023

Fix the issues discussed in #2735 by reimplementing the CoordinatorState state subscription with reflect.Select. The new design can't be blocked by idle subscribers, even for a short period, and values sent to subscribers always reflect the most current state, not a possibly queued / out-of-order earlier state.

Fixes #2735.

Checklist

  • My code follows the style guidelines of this project
  • I have commented my code, particularly in hard-to-understand areas
  • I have made corresponding changes to the documentation
  • I have made corresponding change to the default configuration files
  • I have added tests that prove my fix is effective or that my feature works
  • I have added an entry in ./changelog/fragments using the changelog tool
  • I have added an integration test or an E2E test

@faec faec added bug Something isn't working Team:Elastic-Agent Label for the Agent team labels May 26, 2023
@faec faec self-assigned this May 26, 2023
@mergify
Copy link
Contributor

mergify bot commented May 26, 2023

This pull request does not have a backport label. Could you fix it @faec? 🙏
To fixup this pull request, you need to add the backport labels for the needed
branches, such as:

  • backport-v./d./d./d is the label to automatically backport to the 8./d branch. /d is the digit

NOTE: backport-skip has been added to this pull request.

@elasticmachine
Copy link
Contributor

elasticmachine commented May 26, 2023

💚 Build Succeeded

the below badges are clickable and redirect to their specific view in the CI or DOCS
Pipeline View Test View Changes Artifacts preview preview

Expand to view the summary

Build stats

  • Start Time: 2023-05-29T12:35:11.922+0000

  • Duration: 19 min 48 sec

Test stats 🧪

Test Results
Failed 0
Passed 5667
Skipped 19
Total 5686

💚 Flaky test report

Tests succeeded.

🤖 GitHub comments

Expand to view the GitHub comments

To re-run your PR in the CI, just comment with:

  • /test : Re-trigger the build.

  • /package : Generate the packages.

  • run integration tests : Run the Elastic Agent Integration tests.

  • run end-to-end tests : Generate the packages and run the E2E Tests.

  • run elasticsearch-ci/docs : Re-trigger the docs validation. (use unformatted text in the comment!)

@elasticmachine
Copy link
Contributor

elasticmachine commented May 26, 2023

🌐 Coverage report

Name Metrics % (covered/total) Diff
Packages 98.611% (71/72) 👍
Files 68.273% (170/249) 👍
Classes 67.021% (315/470) 👍
Methods 53.678% (956/1781) 👍
Lines 39.497% (11007/27868) 👎 -0.014
Conditionals 100.0% (0/0) 💚

@faec faec marked this pull request as ready for review May 26, 2023 20:51
@faec faec requested a review from a team as a code owner May 26, 2023 20:51
@faec faec requested a review from pchila May 26, 2023 22:10
Copy link
Member

@cmacknz cmacknz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice! I didn't realize reflect.Select existed before this PR.

send := func(sub *StateSubscription) {
t := time.NewTimer(time.Second)
defer t.Stop()
for {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As far as I can tell, the lifetime of the coordinator is the same as the lifetime as the agent process so this never exiting is likely fine. Is that understanding correct?

Seems like the entrypoint that would create this follows from

coord := coordinator.New(log, cfg, logLevel, agentInfo, specs, reexec, upgrader, runtime, configMgr, composable, caps, monitor, isManaged, compModifiers...)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that's my understanding as well. I figured it was ok to leave this dangling since the previous version did the same with its own helper goroutines (every listener previously spawned a separate goroutine that did nothing but wait for that listener's context to end, which is no longer needed in this version), and there's no obvious context bounding CoordinatorState's lifecycle.

@@ -192,20 +208,26 @@ func (cs *CoordinatorState) UpdateComponentState(state runtime.ComponentComponen

// State returns the current state for the coordinator.
func (cs *CoordinatorState) State() (s State) {
// We need to claim all three mutexes simultaneously, otherwise we may
// collect inconsistent states from the different components if one of them
// changes during this function call.
Copy link
Member

@cmacknz cmacknz May 29, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks more correct, but I am trying to decide whether the number of mutexes we need to grab here is a code smell.

Given the number of locks we have and the number of deadlock bugs we keep having, I think our concurrency patterns can probably be improved but I don't have any quick fixes to suggest.

Thoughts?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just brainstorming here, I could be waaay off 😅
Aside from locking the whole coordinator state (with a mutex on the whole object, à la synchronized in Java), cloning it, unlocking the mutex on the original and then start processing not much comes to mind...
The same could be achieved with a value receiver, however:

  • we pay the cost of copying the whole CoordinatorState every time we call
  • we have mutexes and pointers within the struct so I am not sure that could work...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do think the number of mutexes is a code smell -- there can be reasons for it but it's a hazard, especially when the assumptions aren't made explicit (e.g. functions that can/should be called with a mutex, or that can claim a mutex, should document that, and mutexes should document who uses them and why, otherwise it can give the impression of synchronization without consistent results).

Looking again at the ways the different mutexes are used, I see another potential concern: the way compStatesMx is used in UpdateComponentState in two stages with two calls to cs.changed() suggests that cs.changed() is being used intentionally to queue multiple changed states, with the expectation that a unit entering "stopped" state will produce two separate events for subscribers, one with the state changed to "stopped" and the second with the unit removed entirely.

The problem is, the new code doesn't guarantee that those will be sent to all subscribers as discrete states -- it intentionally sends subscribers only the most recent state -- but neither did the previous code, since states could be dropped or delivered out-of-order. Requiring subscribers to receive a distinct new state corresponding to every call to cs.changed() (instead of just the most recent update whenever they are able to receive one) would require much more careful handling, e.g. maintaining separate queues for each client. I think that would be so troublesome that we should prefer not to attempt it unless there are very strong reasons why it's needed (maybe @blakerouse knows more about the requirements?) -- otherwise maybe we should just switch to a single mutex, which would simplify UpdateComponentState and the overall synchronization pattern and would probably not cause bottleneck issues since none of the mutex-protected updates seem to require more than an array traversal.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Related: if we do care about transmitting any particular state, rather than just "whatever is current," then cs.State() should never have acquired those mutexes at all, they should already be held by the caller. Otherwise, as soon as the caller releases them, there could be another change before cs.change() takes effect, dropping the intended state.)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We want the subscriber to get all states in the order they occurred without missing a state. I believe without that the state machine code that is used to resolve wether a state has been reached will be missed and cause issues with the pkg/testing.NewFixture.

So I do believe we will need to create a queue per subscriber.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok -- then this probably needs to be escalated because this has serious implications for how different components talk to each other. Who are the stakeholders for this? Making this kind of guarantee is expensive and we should pin down the specific requirements -- this requirement means there are dramatically more state leaks in the baseline code than it seemed like, since none of these calls generated their reported state while still holding the mutex they used to change it. I think we would get dramatically better reliability for our effort if we could find a way to avoid this, but if we need to do it then let's figure out how best to limit the scope.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We want the subscriber to get all states in the order they occurred without missing a state.

This is the ideal, but reading this PR I'm not convinced we have correctly achieved it. Under low load with few state transitions we may have, but we have no tests to guarantee that states are always observed correctly for all possible state, goroutine, or lock orderings. This PR is showing that if we had those, they would fail. I suspect this type of locking bug also exists in other places.

This PR exchanges a random chance of reading states out of order or not at all with a deterministic guarantee that an observer always reads the current state at the time it is ready to read the state again. I think this is strictly an improvement. If the rate of state changes is reasonable, and the watcher is never blocked for significant amounts of time this likely works just fine and can never block the coordinator.

There are probably ways to improve this further, one obvious one mentioned previously is to add a queue per subscriber but this isn't entirely straightforward either. Unless we allow a queue with unbounded growth a slow subscriber could still block the coordinator.

I think our concurrency patterns can be improved, here and elsewhere. I don't want to turn this comment in a large debate, so I am going to add this PR to the agenda of the next agent core meeting and invite Fae. I know she has some good ideas for other ways to approach this, and is also going to be evaluating the race detector failures in #2743 which might give us some other hints.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We want the subscriber to get all states in the order they occurred without missing a state.

This is the ideal, but reading this PR I'm not convinced we have correctly achieved it. Under low load with few state transitions we may have, but we have no tests to guarantee that states are always observed correctly for all possible state, goroutine, or lock orderings. This PR is showing that if we had those, they would fail. I suspect this type of locking bug also exists in other places.

That is not true. It was achieved as it would actually block if it couldn't deliver the state. Please show how it was not actually achieved? Even if that was the case the state machine code still required it, and would have cause a bug and resulted in failed tests.

This PR exchanges a random chance of reading states out of order or not at all with a deterministic guarantee that an observer always reads the current state at the time it is ready to read the state again. I think this is strictly an improvement. If the rate of state changes is reasonable, and the watcher is never blocked for significant amounts of time this likely works just fine and can never block the coordinator.

True if the reader of the channel is reading fast enough it won't matter, but this change results in the chances of that happening to be much larger. It now relies on the golang runtime to schedule often enough to not miss, other implementation did not have that problem as it blocked for each state change.

There are probably ways to improve this further, one obvious one mentioned previously is to add a queue per subscriber but this isn't entirely straightforward either. Unless we allow a queue with unbounded growth a slow subscriber could still block the coordinator.

As I said in my comment, using a ring buffer with a limit would result this being non-blocking and always received in the correct order. Only in the case that the reader was really behind would it then miss state changes, once the ring buffer cycles. This is not complicated to implement or understand.

I think our concurrency patterns can be improved, here and elsewhere. I don't want to turn this comment in a large debate, so I am going to add this PR to the agenda of the next agent core meeting and invite Fae. I know she has some good ideas for other ways to approach this, and is also going to be evaluating the race detector failures in #2743 which might give us some other hints.

Sounds good to me.

} else {
subscriberIndex := (chosen - firstSubscriberIndex) / 2
if (chosen-firstSubscriberIndex)%2 == 0 {
// The subscriber's done channel has been closed, remove
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we have a test where a subscriber is removed.

I went to measure the test coverage of this code manually and the Go tooling won't give me a test report for this package because the state implementation is alone in its own package. We should just move this into the coordinator package or move the tests into the state package to make this possible.

Copy link
Contributor

@blakerouse blakerouse Jun 5, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was moved into its own package to ensure that the coordinator only interfaces with the state management using the public interface. This is to ensure that locks and other pieces of the state are not touched directly by the coordinator.

Copy link
Contributor

@blakerouse blakerouse left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See my inline comment about a queue per subscriber. Each subscriber does need each state as it occurs and cannot miss a transition.

}
go cs.stateReporter()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What cleans up this go routine? We should ensure that when the coordinator is stopped that this go routine is also stopped.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See the other comments with @cmacknz -- the previous version left many goroutines that weren't cleaned up, so I thought reducing that to one would still be acceptable since CoordinatorState probably exists for the lifetime of the Coordinator. If that's not the case, what should determine the lifetime here?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is true for the life of the Elastic Agent, but is not true for testing. I believe we should clean this up properly. That previous comment said that the goroutines that you are speaking of where cleaned up when the context was cancelled, so they where cleaned up by the caller no?

@@ -192,20 +208,26 @@ func (cs *CoordinatorState) UpdateComponentState(state runtime.ComponentComponen

// State returns the current state for the coordinator.
func (cs *CoordinatorState) State() (s State) {
// We need to claim all three mutexes simultaneously, otherwise we may
// collect inconsistent states from the different components if one of them
// changes during this function call.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We want the subscriber to get all states in the order they occurred without missing a state. I believe without that the state machine code that is used to resolve wether a state has been reached will be missed and cause issues with the pkg/testing.NewFixture.

So I do believe we will need to create a queue per subscriber.

@blakerouse
Copy link
Contributor

What I would prefer to see in the change would be that each subscriber gets there own queue of state changes. If the subscriber is slow then it will get queued up, if the subscriber is so slow that the queue fills up then older messages can be dropped. Hopefully that would not be the case, but we should do our best to ensure that every state change is sent to the subscriber, missing a state change will break the state machine logic that is used in the testing framework. It requires that it gets every state change.

@mergify
Copy link
Contributor

mergify bot commented Jun 7, 2023

This pull request is now in conflicts. Could you fix it? 🙏
To fixup this pull request, you can check out it locally. See documentation: https://help.github.com/articles/checking-out-pull-requests-locally/

git fetch upstream
git checkout -b statewatch-fix upstream/statewatch-fix
git merge upstream/main
git push upstream statewatch-fix

@faec
Copy link
Contributor Author

faec commented Jun 12, 2023

In last week's Agent core team meeting we decided to rewrite this as a modular component with stronger delivery guarantees, see #2819. Closing this one, and will open the new implementation as its own PR later this week.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
backport-skip bug Something isn't working skip-changelog Team:Elastic-Agent Label for the Agent team
Projects
None yet
Development

Successfully merging this pull request may close these issues.

CoordinatorState can block / deliver state updates out of order
5 participants