Skip to content

Commit

Permalink
Create non-blocking broadcaster helper and use it to manage Coordinat…
Browse files Browse the repository at this point in the history
…or state notifications (#2849)
  • Loading branch information
faec authored Jun 26, 2023
1 parent 6b6bb42 commit 729636a
Show file tree
Hide file tree
Showing 37 changed files with 2,722 additions and 10,273 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@ import (
"fmt"
"time"

"github.com/elastic/elastic-agent/internal/pkg/agent/application/coordinator/state"

"github.com/elastic/elastic-agent-client/v7/pkg/client"

"github.com/elastic/elastic-agent/internal/pkg/agent/application/coordinator"
Expand Down Expand Up @@ -171,7 +169,7 @@ func readMapString(m map[string]interface{}, key string, def string) string {
return def
}

func findUnitFromInputType(state state.State, inputType string) (component.Component, component.Unit, bool) {
func findUnitFromInputType(state coordinator.State, inputType string) (component.Component, component.Unit, bool) {
for _, comp := range state.Components {
for _, unit := range comp.Component.Units {
if unit.Type == client.UnitTypeInput && unit.Config != nil && unit.Config.Type == inputType {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ func (h *Upgrade) Handle(ctx context.Context, a fleetapi.Action, ack acker.Acker
go func() {
h.log.Infof("starting upgrade to version %s in background", action.Version)
if err := h.coord.Upgrade(asyncCtx, action.Version, action.SourceURI, action, false); err != nil {

h.log.Errorf("upgrade to version %s failed: %v", action.Version, err)
// If context is cancelled in getAsyncContext, the actions are acked there
if !errors.Is(asyncCtx.Err(), context.Canceled) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,59 +51,102 @@ func (u *mockUpgradeManager) Ack(ctx context.Context, acker acker.Acker) error {
}

func TestUpgradeHandler(t *testing.T) {
// Create a cancellable context that will shut down the coordinator after
// the test.
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

log, _ := logger.New("", false)
ack := noopacker.New()
agentInfo, _ := info.NewAgentInfo(true)
msgChan := make(chan string)
upgradeMgr := &mockUpgradeManager{msgChan: msgChan}
specs := component.RuntimeSpecs{}
c := coordinator.New(log, configuration.DefaultConfiguration(), logger.DefaultLogLevel, agentInfo, specs, nil, upgradeMgr, nil, nil, nil, nil, nil, false)

// Create and start the coordinator
c := coordinator.New(
log,
configuration.DefaultConfiguration(),
logger.DefaultLogLevel,
agentInfo,
component.RuntimeSpecs{},
nil,
&mockUpgradeManager{msgChan: msgChan},
nil, nil, nil, nil, nil, false)
//nolint:errcheck // We don't need the termination state of the Coordinator
go c.Run(ctx)

u := NewUpgrade(log, c)
ctx := context.Background()
a := fleetapi.ActionUpgrade{Version: "8.3.0", SourceURI: "http://localhost"}
ack := noopacker.New()
err := u.Handle(ctx, &a, ack)
require.NoError(t, err)
msg := <-msgChan
require.Equal(t, "completed 8.3.0", msg)
}

func TestUpgradeHandlerSameVersion(t *testing.T) {
// Create a cancellable context that will shut down the coordinator after
// the test.
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

log, _ := logger.New("", false)
ack := noopacker.New()
agentInfo, _ := info.NewAgentInfo(true)
msgChan := make(chan string)
upgradeMgr := &mockUpgradeManager{msgChan: msgChan}
specs := component.RuntimeSpecs{}
c := coordinator.New(log, configuration.DefaultConfiguration(), logger.DefaultLogLevel, agentInfo, specs, nil, upgradeMgr, nil, nil, nil, nil, nil, false)

// Create and start the Coordinator
c := coordinator.New(
log,
configuration.DefaultConfiguration(),
logger.DefaultLogLevel,
agentInfo,
component.RuntimeSpecs{},
nil,
&mockUpgradeManager{msgChan: msgChan},
nil, nil, nil, nil, nil, false)
//nolint:errcheck // We don't need the termination state of the Coordinator
go c.Run(ctx)

u := NewUpgrade(log, c)
ctx1 := context.Background()
ctx2 := context.Background()
a := fleetapi.ActionUpgrade{Version: "8.3.0", SourceURI: "http://localhost"}
err1 := u.Handle(ctx1, &a, ack)
err2 := u.Handle(ctx2, &a, ack)
ack := noopacker.New()
err1 := u.Handle(ctx, &a, ack)
err2 := u.Handle(ctx, &a, ack)
require.NoError(t, err1)
require.NoError(t, err2)
msg := <-msgChan
require.Equal(t, "completed 8.3.0", msg)
}

func TestUpgradeHandlerNewVersion(t *testing.T) {
// Create a cancellable context that will shut down the coordinator after
// the test.
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

log, _ := logger.New("", false)
ack := noopacker.New()
agentInfo, _ := info.NewAgentInfo(true)
msgChan := make(chan string)
upgradeMgr := &mockUpgradeManager{msgChan: msgChan}
specs := component.RuntimeSpecs{}
c := coordinator.New(log, configuration.DefaultConfiguration(), logger.DefaultLogLevel, agentInfo, specs, nil, upgradeMgr, nil, nil, nil, nil, nil, false)

// Create and start the Coordinator
c := coordinator.New(
log,
configuration.DefaultConfiguration(),
logger.DefaultLogLevel,
agentInfo,
component.RuntimeSpecs{},
nil,
&mockUpgradeManager{msgChan: msgChan},
nil, nil, nil, nil, nil, false)
//nolint:errcheck // We don't need the termination state of the Coordinator
go c.Run(ctx)

u := NewUpgrade(log, c)
ctx1 := context.Background()
ctx2 := context.Background()
a1 := fleetapi.ActionUpgrade{Version: "8.2.0", SourceURI: "http://localhost"}
a2 := fleetapi.ActionUpgrade{Version: "8.5.0", SourceURI: "http://localhost"}
err1 := u.Handle(ctx1, &a1, ack)
ack := noopacker.New()
err1 := u.Handle(ctx, &a1, ack)
require.NoError(t, err1)
time.Sleep(1 * time.Second)
err2 := u.Handle(ctx2, &a2, ack)
err2 := u.Handle(ctx, &a2, ack)
require.NoError(t, err2)
msg1 := <-msgChan
require.Equal(t, "canceled 8.2.0", msg1)
Expand Down
Loading

0 comments on commit 729636a

Please sign in to comment.