Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create non-blocking broadcaster helper and use it to manage Coordinator state notifications #2849

Merged
merged 44 commits into from
Jun 26, 2023
Merged
Show file tree
Hide file tree
Changes from 24 commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
bb65a47
fix race detector errors in TestCoordinatorDiagnosticHooks
faec Jun 9, 2023
32288f7
Draft in progress
faec Jun 9, 2023
12a071c
finish first round of Coordinator updates
faec Jun 9, 2023
607450d
update dependents of the state package
faec Jun 9, 2023
087c649
Broadcaster in progress
faec Jun 12, 2023
41cd7c6
finish first draft
faec Jun 12, 2023
609c01d
update comments in Coordinator
faec Jun 12, 2023
8b65cd9
Broadcast state updates from the Coordinator run loop
faec Jun 12, 2023
3f61731
Finish initializing the broadcaster
faec Jun 12, 2023
ffb3327
add license comments
faec Jun 12, 2023
bbaa0af
test fix
faec Jun 12, 2023
cb4e51b
add more tests, fix the bugs they found
faec Jun 12, 2023
b8eb439
add another unit test
faec Jun 12, 2023
4ad966c
Split Coordinator inner loop into a standalone helper function
faec Jun 13, 2023
e630de4
fix error message
faec Jun 13, 2023
ed158af
correct LogLevel access
faec Jun 13, 2023
62a64b0
Run upgrade tests against a running Coordinator, and make Coordinator…
faec Jun 13, 2023
1e80301
Merge branch 'main' of github.com:elastic/elastic-agent into coordina…
faec Jun 13, 2023
9c3001c
troubleshooting diagnostics test
faec Jun 13, 2023
361e8b4
move manager communication channels into an internal struct
faec Jun 13, 2023
4e8b2e3
Move some fields around, add comments
faec Jun 14, 2023
76cd507
more tests, more fixes
faec Jun 14, 2023
8b8205d
remove debug printfs :-P
faec Jun 14, 2023
f163bd6
Add/document input buffer parameter, fix associated test races
faec Jun 20, 2023
06c96b2
more tests more tests
faec Jun 20, 2023
1c70700
add comment
faec Jun 21, 2023
981f89d
Merge branch 'main' of github.com:elastic/elastic-agent into coordina…
faec Jun 21, 2023
833d257
mage check
faec Jun 21, 2023
d9a53ff
remove debug printfs
faec Jun 21, 2023
d156df1
Add test changing Coordinator's component model
faec Jun 21, 2023
6e03868
fix protobuf sensitivity in config tests
faec Jun 21, 2023
4f93d91
three coordinator tests left
faec Jun 21, 2023
503154f
add test applying variable updates to policy
faec Jun 21, 2023
a25e268
add override state test
faec Jun 21, 2023
02937ac
add simple upgrade test
faec Jun 21, 2023
0e61643
update todo
faec Jun 21, 2023
b0f8ac2
Finish broadcaster tests
faec Jun 21, 2023
d8358b0
rewiring diagnostics_test
faec Jun 22, 2023
911f65b
replacement diagnostics test in progress
faec Jun 22, 2023
8409573
Remove mocks and golden files from old diagnostics test
faec Jun 22, 2023
d553a8b
make check
faec Jun 22, 2023
9204694
Add more diagnostic tests
faec Jun 23, 2023
c46f9cb
add state diagnostic test
faec Jun 23, 2023
d885df4
Merge branch 'main' of github.com:elastic/elastic-agent into coordina…
faec Jun 23, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@ import (
"fmt"
"time"

"github.com/elastic/elastic-agent/internal/pkg/agent/application/coordinator/state"

"github.com/elastic/elastic-agent-client/v7/pkg/client"

"github.com/elastic/elastic-agent/internal/pkg/agent/application/coordinator"
Expand Down Expand Up @@ -171,7 +169,7 @@ func readMapString(m map[string]interface{}, key string, def string) string {
return def
}

func findUnitFromInputType(state state.State, inputType string) (component.Component, component.Unit, bool) {
func findUnitFromInputType(state coordinator.State, inputType string) (component.Component, component.Unit, bool) {
for _, comp := range state.Components {
for _, unit := range comp.Component.Units {
if unit.Type == client.UnitTypeInput && unit.Config != nil && unit.Config.Type == inputType {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ func (h *Upgrade) Handle(ctx context.Context, a fleetapi.Action, ack acker.Acker
go func() {
h.log.Infof("starting upgrade to version %s in background", action.Version)
if err := h.coord.Upgrade(asyncCtx, action.Version, action.SourceURI, action, false); err != nil {

h.log.Errorf("upgrade to version %s failed: %v", action.Version, err)
// If context is cancelled in getAsyncContext, the actions are acked there
if !errors.Is(asyncCtx.Err(), context.Canceled) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,59 +51,102 @@ func (u *mockUpgradeManager) Ack(ctx context.Context, acker acker.Acker) error {
}

func TestUpgradeHandler(t *testing.T) {
// Create a cancellable context that will shut down the coordinator after
// the test.
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

log, _ := logger.New("", false)
ack := noopacker.New()
agentInfo, _ := info.NewAgentInfo(true)
msgChan := make(chan string)
upgradeMgr := &mockUpgradeManager{msgChan: msgChan}
specs := component.RuntimeSpecs{}
c := coordinator.New(log, configuration.DefaultConfiguration(), logger.DefaultLogLevel, agentInfo, specs, nil, upgradeMgr, nil, nil, nil, nil, nil, false)

// Create and start the coordinator
c := coordinator.New(
log,
configuration.DefaultConfiguration(),
logger.DefaultLogLevel,
agentInfo,
component.RuntimeSpecs{},
nil,
&mockUpgradeManager{msgChan: msgChan},
nil, nil, nil, nil, nil, false)
//nolint:errcheck // We don't need the termination state of the Coordinator
go c.Run(ctx)

u := NewUpgrade(log, c)
ctx := context.Background()
a := fleetapi.ActionUpgrade{Version: "8.3.0", SourceURI: "http://localhost"}
ack := noopacker.New()
err := u.Handle(ctx, &a, ack)
require.NoError(t, err)
msg := <-msgChan
require.Equal(t, "completed 8.3.0", msg)
}

func TestUpgradeHandlerSameVersion(t *testing.T) {
// Create a cancellable context that will shut down the coordinator after
// the test.
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

log, _ := logger.New("", false)
ack := noopacker.New()
agentInfo, _ := info.NewAgentInfo(true)
msgChan := make(chan string)
upgradeMgr := &mockUpgradeManager{msgChan: msgChan}
specs := component.RuntimeSpecs{}
c := coordinator.New(log, configuration.DefaultConfiguration(), logger.DefaultLogLevel, agentInfo, specs, nil, upgradeMgr, nil, nil, nil, nil, nil, false)

// Create and start the Coordinator
c := coordinator.New(
log,
configuration.DefaultConfiguration(),
logger.DefaultLogLevel,
agentInfo,
component.RuntimeSpecs{},
nil,
&mockUpgradeManager{msgChan: msgChan},
nil, nil, nil, nil, nil, false)
//nolint:errcheck // We don't need the termination state of the Coordinator
go c.Run(ctx)

u := NewUpgrade(log, c)
ctx1 := context.Background()
ctx2 := context.Background()
a := fleetapi.ActionUpgrade{Version: "8.3.0", SourceURI: "http://localhost"}
err1 := u.Handle(ctx1, &a, ack)
err2 := u.Handle(ctx2, &a, ack)
ack := noopacker.New()
err1 := u.Handle(ctx, &a, ack)
err2 := u.Handle(ctx, &a, ack)
require.NoError(t, err1)
require.NoError(t, err2)
msg := <-msgChan
require.Equal(t, "completed 8.3.0", msg)
}

func TestUpgradeHandlerNewVersion(t *testing.T) {
// Create a cancellable context that will shut down the coordinator after
// the test.
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

log, _ := logger.New("", false)
ack := noopacker.New()
agentInfo, _ := info.NewAgentInfo(true)
msgChan := make(chan string)
upgradeMgr := &mockUpgradeManager{msgChan: msgChan}
specs := component.RuntimeSpecs{}
c := coordinator.New(log, configuration.DefaultConfiguration(), logger.DefaultLogLevel, agentInfo, specs, nil, upgradeMgr, nil, nil, nil, nil, nil, false)

// Create and start the Coordinator
c := coordinator.New(
log,
configuration.DefaultConfiguration(),
logger.DefaultLogLevel,
agentInfo,
component.RuntimeSpecs{},
nil,
&mockUpgradeManager{msgChan: msgChan},
nil, nil, nil, nil, nil, false)
//nolint:errcheck // We don't need the termination state of the Coordinator
go c.Run(ctx)

u := NewUpgrade(log, c)
ctx1 := context.Background()
ctx2 := context.Background()
a1 := fleetapi.ActionUpgrade{Version: "8.2.0", SourceURI: "http://localhost"}
a2 := fleetapi.ActionUpgrade{Version: "8.5.0", SourceURI: "http://localhost"}
err1 := u.Handle(ctx1, &a1, ack)
ack := noopacker.New()
err1 := u.Handle(ctx, &a1, ack)
require.NoError(t, err1)
time.Sleep(1 * time.Second)
err2 := u.Handle(ctx2, &a2, ack)
err2 := u.Handle(ctx, &a2, ack)
require.NoError(t, err2)
msg1 := <-msgChan
require.Equal(t, "canceled 8.2.0", msg1)
Expand Down
Loading