Skip to content

Commit

Permalink
Merge branch 'main' into 5434-fix-state-store-panic
Browse files Browse the repository at this point in the history
  • Loading branch information
AndersonQ committed Sep 5, 2024
2 parents 7e42fe3 + a172479 commit 4f67b2a
Show file tree
Hide file tree
Showing 6 changed files with 103 additions and 56 deletions.
2 changes: 1 addition & 1 deletion .buildkite/scripts/common.sh
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ if [[ -z "${WORKSPACE-""}" ]]; then
export WORKSPACE
fi
if [[ -z "${SETUP_MAGE_VERSION-""}" ]]; then
SETUP_MAGE_VERSION="1.14.0"
SETUP_MAGE_VERSION="1.15.0"
fi
if [[ -z "${SETUP_GVM_VERSION-""}" ]]; then
SETUP_GVM_VERSION="v0.5.0" # https://github.com/andrewkroh/gvm/issues/44#issuecomment-1013231151
Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ COVERAGE_DIR=$(BUILD_DIR)/coverage
BEATS?=elastic-agent
PROJECTS= $(BEATS)
PYTHON_ENV?=$(BUILD_DIR)/python-env
MAGE_VERSION ?= v1.14.0
MAGE_VERSION ?= v1.15.0
MAGE_PRESENT := $(shell mage --version 2> /dev/null | grep $(MAGE_VERSION))
MAGE_IMPORT_PATH ?= github.com/magefile/mage
export MAGE_IMPORT_PATH
Expand Down
79 changes: 41 additions & 38 deletions internal/pkg/agent/application/dispatcher/dispatcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,41 +231,38 @@ func TestActionDispatcher(t *testing.T) {
})

t.Run("Cancel queued action", func(t *testing.T) {
def := &mockHandler{}
calledCh := make(chan bool)
call := def.On("Handle", mock.Anything, mock.Anything, mock.Anything).Return(nil).Once()
call.RunFn = func(_ mock.Arguments) {
calledCh <- true
}

queue := &mockQueue{}
queue.On("Save").Return(nil).Once()
queue.On("DequeueActions").Return([]fleetapi.ScheduledAction{}).Once()

d, err := New(nil, t.TempDir(), def, queue)
require.NoError(t, err)
err = d.Register(&mockAction{}, def)
require.NoError(t, err)

action := &mockAction{}
action.On("Type").Return(fleetapi.ActionTypeCancel)
action.On("ID").Return("id")

dispatchCtx, cancelFn := context.WithCancel(context.Background())
defer cancelFn()
go d.Dispatch(dispatchCtx, detailsSetter, ack, action)

def := &mockHandler{}
def.On("Handle", dispatchCtx, action, ack).Return(nil).Once()

d, err := New(nil, t.TempDir(), def, queue)
require.NoError(t, err)

dispatchCompleted := make(chan struct{})
go func() {
d.Dispatch(dispatchCtx, detailsSetter, ack, action)
dispatchCompleted <- struct{}{}
}()

select {
case err := <-d.Errors():
t.Fatalf("Unexpected error: %v", err)
case <-calledCh:
// Handle was called, expected
case <-time.After(1 * time.Second):
t.Fatal("mock Handle never called")
case <-dispatchCompleted:
// OK, expected to complete the dispatch without blocking on the errors channel
}

def.AssertExpectations(t)
// Flaky assertion: https://github.com/elastic/elastic-agent/issues/3137
// TODO: re-enabled when fixed
// queue.AssertExpectations(t)
queue.AssertExpectations(t)
})

t.Run("Retrieve actions from queue", func(t *testing.T) {
Expand Down Expand Up @@ -360,43 +357,49 @@ func TestActionDispatcher(t *testing.T) {
action.AssertExpectations(t)
})

t.Run("Dispatch multiples events returns one error", func(t *testing.T) {
def := &mockHandler{}
def.On("Handle", mock.Anything, mock.Anything, mock.Anything).Return(errors.New("test error")).Once()
def.On("Handle", mock.Anything, mock.Anything, mock.Anything).Return(nil).Once()

t.Run("Dispatch multiple events returns one error", func(t *testing.T) {
queue := &mockQueue{}
queue.On("Save").Return(nil).Once()
queue.On("DequeueActions").Return([]fleetapi.ScheduledAction{}).Once()

d, err := New(nil, t.TempDir(), def, queue)
require.NoError(t, err)
err = d.Register(&mockAction{}, def)
require.NoError(t, err)

action1 := &mockAction{}
action1.On("Type").Return("action")
action1.On("ID").Return("id")

action2 := &mockAction{}
action2.On("Type").Return("action")
action2.On("ID").Return("id")

// Kind of a dirty work around to test an error return.
// launch in another routing and sleep to check if an error is generated
dispatchCtx, cancelFn := context.WithCancel(context.Background())
defer cancelFn()
go d.Dispatch(dispatchCtx, detailsSetter, ack, action1, action2)
time.Sleep(time.Millisecond * 200)

def := &mockHandler{}
def.On("Handle", dispatchCtx, action1, ack).Return(errors.New("first error")).Once()
def.On("Handle", dispatchCtx, action2, ack).Return(errors.New("second error")).Once()

d, err := New(nil, t.TempDir(), def, queue)
require.NoError(t, err)

dispatchCompleted := make(chan struct{})
go func() {
d.Dispatch(dispatchCtx, detailsSetter, ack, action1, action2)
dispatchCompleted <- struct{}{}
}()

// First, assert that the Dispatch method puts one error - the second one - on the error channel.
select {
case <-d.Errors():
default:
case err := <-d.Errors():
assert.EqualError(t, err, "second error")
case <-dispatchCompleted:
t.Fatal("Expected error")
}
time.Sleep(time.Millisecond * 200)

// Second, assert that the Dispatch method completes without putting anything else on the error channel.
select {
case <-d.Errors():
t.Fatal(err)
default:
case <-dispatchCompleted:
// Expecting the dispatch to complete.
}

def.AssertExpectations(t)
Expand Down
16 changes: 10 additions & 6 deletions internal/pkg/agent/application/managed_mode_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,8 @@ func Test_runDispatcher(t *testing.T) {
name string
mockGateway func(chan []fleetapi.Action) *mockGateway
mockDispatcher func() *mockDispatcher
interval time.Duration
flushInterval time.Duration
contextTimeout time.Duration
skipOnWindowsReason string
}{{
name: "dispatcher not called",
Expand All @@ -87,7 +88,8 @@ func Test_runDispatcher(t *testing.T) {
dispatcher := &mockDispatcher{}
return dispatcher
},
interval: time.Second,
flushInterval: time.Second,
contextTimeout: time.Millisecond * 100,
}, {
name: "gateway actions passed",
mockGateway: func(ch chan []fleetapi.Action) *mockGateway {
Expand All @@ -101,7 +103,8 @@ func Test_runDispatcher(t *testing.T) {
dispatcher.On("Dispatch", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Once()
return dispatcher
},
interval: time.Second,
flushInterval: time.Second,
contextTimeout: time.Millisecond * 200,
}, {
name: "no gateway actions, dispatcher is flushed",
mockGateway: func(ch chan []fleetapi.Action) *mockGateway {
Expand All @@ -115,7 +118,8 @@ func Test_runDispatcher(t *testing.T) {
dispatcher.On("Dispatch", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Maybe() // allow a second call in case there are timing issues in the CI pipeline
return dispatcher
},
interval: time.Millisecond * 60,
flushInterval: time.Millisecond * 60,
contextTimeout: time.Millisecond * 100,
}}

for _, tc := range tests {
Expand All @@ -130,9 +134,9 @@ func Test_runDispatcher(t *testing.T) {
detailsSetter := func(upgradeDetails *details.Details) {}
acker := &mockAcker{}

ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*100)
ctx, cancel := context.WithTimeout(context.Background(), tc.contextTimeout)
defer cancel()
runDispatcher(ctx, dispatcher, gateway, detailsSetter, acker, tc.interval)
runDispatcher(ctx, dispatcher, gateway, detailsSetter, acker, tc.flushInterval)
assert.Empty(t, ch)

gateway.AssertExpectations(t)
Expand Down
2 changes: 1 addition & 1 deletion internal/pkg/agent/storage/store/state_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ func (s *StateStore) SetAction(a fleetapi.Action) {
// the reflect.ValueOf(v).IsNil() is required as the type of 'v' on switch
// clause with multiple types will, in this case, preserve the original type.
// See details on https://go.dev/ref/spec#Type_switches
// Without using reflect accessing the concret type stored in the interface
// Without using reflect accessing the concrete type stored in the interface
// isn't possible and as a is of type fleetapi.Action and has a concrete
// value, a is never nil, neither v is nil as it has the same type of a
// on both clauses.
Expand Down
58 changes: 49 additions & 9 deletions testing/integration/package_version_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"context"
"encoding/json"
"fmt"
"io"
"os"
"os/exec"
"path/filepath"
Expand Down Expand Up @@ -102,23 +103,49 @@ func TestComponentBuildHashInDiagnostics(t *testing.T) {
return false
}

if client.State(status.State) != client.Healthy {
stateBuff.WriteString(fmt.Sprintf(
"agent isn't healthy: %s-%s",
client.State(status.State), status.Message))
return false
}

if len(status.Components) == 0 {
stateBuff.WriteString(fmt.Sprintf(
"healthy but without components: agent status: %s-%s",
client.State(status.State), status.Message))
return false
}

// the agent might be healthy but waiting its first configuration,
// in that case, there would be no components yet. Therefore, ensure
// the agent received the policy with components before proceeding with
// the test.
for _, c := range status.Components {
bs, err := json.MarshalIndent(status, "", " ")
if err != nil {
stateBuff.WriteString(fmt.Sprintf("%s not health, could not marshal status outptu: %v",
stateBuff.WriteString(fmt.Sprintf(
"%s not healthy, could not marshal status outptu: %v",
c.Name, err))
return false
}

state := client.State(c.State)
if state != client.Healthy {
stateBuff.WriteString(fmt.Sprintf("%s not health, agent status output: %s",
stateBuff.WriteString(fmt.Sprintf(
"%s not health, agent status output: %s",
c.Name, bs))
return false
}

// there is a rare a race condition unlike to happen on a
// production scenario where the component is healthy but the
// version info delays to update. As the Status command and the
// diagnostics fetch this information in the same way, it guarantees
// the version info is up-to-date before proceeding with the test.
if c.VersionInfo.Meta.Commit == "" {
stateBuff.WriteString(fmt.Sprintf("%s health, but no versionInfo. agent status output: %s",
stateBuff.WriteString(fmt.Sprintf(
"%s health, but no versionInfo. agent status output: %s",
c.Name, bs))
return false
}
Expand All @@ -130,13 +157,13 @@ func TestComponentBuildHashInDiagnostics(t *testing.T) {
allHealthy,
5*time.Minute, 10*time.Second,
"agent never became healthy. Last status: %v", &stateBuff)
t.Cleanup(func() {
defer func() {
if !t.Failed() {
return
}

t.Logf("test failed: last status output: %v", status)
})
t.Logf("test failed: last status output: %#v", status)
}()

agentbeat := "agentbeat"
if runtime.GOOS == "windows" {
Expand Down Expand Up @@ -173,8 +200,8 @@ func TestComponentBuildHashInDiagnostics(t *testing.T) {

diag := t.TempDir()
extractZipArchive(t, diagZip, diag)
// the test is flaky, so collecting some data to analyze later.
t.Cleanup(func() {
// if the test fails, the diagnostics used is useful for debugging.
defer func() {
if !t.Failed() {
return
}
Expand All @@ -194,7 +221,7 @@ func TestComponentBuildHashInDiagnostics(t *testing.T) {
diagDir, err)
return
}
})
}()

stateFilePath := filepath.Join(diag, "state.yaml")
stateYAML, err := os.Open(stateFilePath)
Expand Down Expand Up @@ -228,6 +255,19 @@ func TestComponentBuildHashInDiagnostics(t *testing.T) {
assert.Equalf(t, wantBuildHash, c.State.VersionInfo.Meta.Commit,
"component %s: VersionInfo.Meta.Commit mismatch", c.ID)
}

if t.Failed() {
_, seek := stateYAML.Seek(0, 0)
if seek != nil {
t.Logf("could not reset state.yaml offset to print it")
return
}
data, err := io.ReadAll(stateYAML)
if err != nil {
t.Logf("could not read state.yaml: %v", err)
}
t.Logf("test failed: state.yaml contents: %q", string(data))
}
}

func testVersionWithRunningAgent(runCtx context.Context, f *atesting.Fixture) func(*testing.T) {
Expand Down

0 comments on commit 4f67b2a

Please sign in to comment.