From a7b7bcae164972641a618b7152bf9285d28620b3 Mon Sep 17 00:00:00 2001 From: Anderson Queiroz Date: Thu, 7 Mar 2024 10:59:03 +0100 Subject: [PATCH] refactor state store (#4253) It modifies the state store API to match the current needs. --- .../handlers/handler_action_unenroll.go | 6 +- .../gateway/fleet/fleet_gateway.go | 2 - .../pkg/agent/application/managed_mode.go | 15 +- internal/pkg/agent/cmd/run.go | 4 +- .../pkg/agent/storage/store/action_store.go | 14 +- .../pkg/agent/storage/store/state_store.go | 368 +++++++++--------- .../agent/storage/store/state_store_test.go | 148 ++++--- internal/pkg/config/operations/inspector.go | 12 +- internal/pkg/fleetapi/action.go | 64 +-- internal/pkg/queue/actionqueue.go | 20 +- internal/pkg/queue/actionqueue_test.go | 10 +- 11 files changed, 325 insertions(+), 338 deletions(-) diff --git a/internal/pkg/agent/application/actions/handlers/handler_action_unenroll.go b/internal/pkg/agent/application/actions/handlers/handler_action_unenroll.go index 0fb9acb8da7..4b4c9c4dfdb 100644 --- a/internal/pkg/agent/application/actions/handlers/handler_action_unenroll.go +++ b/internal/pkg/agent/application/actions/handlers/handler_action_unenroll.go @@ -22,11 +22,11 @@ const ( ) type stateStore interface { - Add(fleetapi.Action) + SetAction(fleetapi.Action) AckToken() string SetAckToken(ackToken string) Save() error - Actions() []fleetapi.Action + Action() fleetapi.Action } // Unenroll results in running agent entering idle state, non managed non standalone. @@ -94,7 +94,7 @@ func (h *Unenroll) Handle(ctx context.Context, a fleetapi.Action, acker acker.Ac if h.stateStore != nil { // backup action for future start to avoid starting fleet gateway loop - h.stateStore.Add(a) + h.stateStore.SetAction(a) if err := h.stateStore.Save(); err != nil { h.log.Warnf("Failed to update state store: %v", err) } diff --git a/internal/pkg/agent/application/gateway/fleet/fleet_gateway.go b/internal/pkg/agent/application/gateway/fleet/fleet_gateway.go index 109ece58be9..2421f0e7792 100644 --- a/internal/pkg/agent/application/gateway/fleet/fleet_gateway.go +++ b/internal/pkg/agent/application/gateway/fleet/fleet_gateway.go @@ -58,11 +58,9 @@ type agentInfo interface { } type stateStore interface { - Add(fleetapi.Action) AckToken() string SetAckToken(ackToken string) Save() error - Actions() []fleetapi.Action } type FleetGateway struct { diff --git a/internal/pkg/agent/application/managed_mode.go b/internal/pkg/agent/application/managed_mode.go index 7f8d22a16ca..f940641c0d1 100644 --- a/internal/pkg/agent/application/managed_mode.go +++ b/internal/pkg/agent/application/managed_mode.go @@ -157,14 +157,14 @@ func (m *managedConfigManager) Run(ctx context.Context) error { close(retrierRun) }() - actions := m.stateStore.Actions() + action := m.stateStore.Action() stateRestored := false - if len(actions) > 0 && !m.wasUnenrolled() { + if action != nil && !m.wasUnenrolled() { // TODO(ph) We will need an improvement on fleet, if there is an error while dispatching a // persisted action on disk we should be able to ask Fleet to get the latest configuration. // But at the moment this is not possible because the policy change was acked. m.log.Info("restoring current policy from disk") - m.dispatcher.Dispatch(ctx, m.coord.SetUpgradeDetails, actionAcker, actions...) + m.dispatcher.Dispatch(ctx, m.coord.SetUpgradeDetails, actionAcker, action) stateRestored = true } @@ -268,13 +268,8 @@ func (m *managedConfigManager) Watch() <-chan coordinator.ConfigChange { } func (m *managedConfigManager) wasUnenrolled() bool { - actions := m.stateStore.Actions() - for _, a := range actions { - if a.Type() == "UNENROLL" { - return true - } - } - return false + return m.stateStore.Action() != nil && + m.stateStore.Action().Type() == fleetapi.ActionTypeUnenroll } func (m *managedConfigManager) initFleetServer(ctx context.Context, cfg *configuration.FleetServerConfig) error { diff --git a/internal/pkg/agent/cmd/run.go b/internal/pkg/agent/cmd/run.go index ae99ba108a9..00ebe186d6e 100644 --- a/internal/pkg/agent/cmd/run.go +++ b/internal/pkg/agent/cmd/run.go @@ -200,7 +200,9 @@ func runElasticAgent(ctx context.Context, cancel context.CancelFunc, override cf } // the encrypted state does not exist but the unencrypted file does - err = migration.MigrateToEncryptedConfig(ctx, l, paths.AgentStateStoreYmlFile(), paths.AgentStateStoreFile()) + err = migration.MigrateToEncryptedConfig(ctx, l, + paths.AgentStateStoreYmlFile(), + paths.AgentStateStoreFile()) if err != nil { return errors.New(err, "error migrating agent state") } diff --git a/internal/pkg/agent/storage/store/action_store.go b/internal/pkg/agent/storage/store/action_store.go index 9f40dd678e3..dd7f839d846 100644 --- a/internal/pkg/agent/storage/store/action_store.go +++ b/internal/pkg/agent/storage/store/action_store.go @@ -23,13 +23,13 @@ import ( // Deprecated. type actionStore struct { log *logger.Logger - store storeLoad + store saveLoader dirty bool - action action + action fleetapi.Action } // newActionStore creates a new action store. -func newActionStore(log *logger.Logger, store storeLoad) (*actionStore, error) { +func newActionStore(log *logger.Logger, store saveLoader) (*actionStore, error) { // If the store exists we will read it, if an error is returned we log it // and return an empty store. reader, err := store.Load() @@ -64,7 +64,7 @@ func newActionStore(log *logger.Logger, store storeLoad) (*actionStore, error) { // add is only taking care of ActionPolicyChange for now and will only keep the last one it receive, // any other type of action will be silently ignored. -func (s *actionStore) add(a action) { +func (s *actionStore) add(a fleetapi.Action) { switch v := a.(type) { case *fleetapi.ActionPolicyChange, *fleetapi.ActionUnenroll: // Only persist the action if the action is different. @@ -117,12 +117,12 @@ func (s *actionStore) save() error { // actions returns a slice of action to execute in order, currently only a action policy change is // persisted. -func (s *actionStore) actions() []action { +func (s *actionStore) actions() fleetapi.Actions { if s.action == nil { - return []action{} + return fleetapi.Actions{} } - return []action{s.action} + return fleetapi.Actions{s.action} } // actionPolicyChangeSerializer is a struct that adds a YAML serialization, I don't think serialization diff --git a/internal/pkg/agent/storage/store/state_store.go b/internal/pkg/agent/storage/store/state_store.go index e11837d7f42..e3feb23103d 100644 --- a/internal/pkg/agent/storage/store/state_store.go +++ b/internal/pkg/agent/storage/store/state_store.go @@ -14,23 +14,24 @@ import ( "sync" "github.com/elastic/elastic-agent/internal/pkg/agent/storage" - "github.com/elastic/elastic-agent/internal/pkg/conv" "github.com/elastic/elastic-agent/internal/pkg/fleetapi" "github.com/elastic/elastic-agent/internal/pkg/fleetapi/acker" "github.com/elastic/elastic-agent/pkg/core/logger" ) -type store interface { +// Version is the current StateStore version. If any breaking change is +// introduced, it should be increased and a migration added. +const Version = "1" + +type saver interface { Save(io.Reader) error } -type storeLoad interface { - store +type saveLoader interface { + saver Load() (io.ReadCloser, error) } -type action = fleetapi.Action - // StateStore is a combined agent state storage initially derived from the former actionStore // and modified to allow persistence of additional agent specific state information. // The following is the original actionStore implementation description: @@ -40,7 +41,7 @@ type action = fleetapi.Action // Fleet. The store is not thread safe. type StateStore struct { log *logger.Logger - store storeLoad + store saveLoader dirty bool state state @@ -48,43 +49,75 @@ type StateStore struct { } type state struct { - action action - ackToken string - // TODO: the queue is for scheduled actions. Set its type accordingly. - queue []action + Version string `json:"version"` + ActionSerializer actionSerializer `json:"action,omitempty"` + AckToken string `json:"ack_token,omitempty"` + Queue actionQueue `json:"action_queue,omitempty"` } -// actionSerializer is a combined yml serializer for the ActionPolicyChange and ActionUnenroll -// it is used to read the yaml file and assign the action to state.action as we must provide the -// underlying struct that provides the action interface. -// TODO: get rid of this type +// actionSerializer is JSON Marshaler/Unmarshaler for fleetapi.Action. type actionSerializer struct { - ID string `json:"action_id"` - Type string `json:"action_type"` - Data actionDataSerializer `json:"data,omitempty"` - IsDetected *bool `json:"is_detected,omitempty"` - Signed *fleetapi.Signed `json:"signed,omitempty"` + json.Marshaler + json.Unmarshaler + + Action fleetapi.Action } -type actionDataSerializer struct { - Policy map[string]interface{} `json:"policy" yaml:"policy,omitempty"` +// actionQueue stores scheduled actions to be executed and the type is needed +// to make it possible to marshal and unmarshal fleetapi.ScheduledActions. +// The fleetapi package marshal/unmarshal fleetapi.Actions, therefore it does +// not need to handle fleetapi.ScheduledAction separately. However, the store does, +// therefore the need for this type to do so. +type actionQueue []fleetapi.ScheduledAction + +func (as *actionSerializer) MarshalJSON() ([]byte, error) { + return json.Marshal(as.Action) } -// stateSerializer is used to serialize the state to yaml. -// action serialization is handled through the actionSerializer struct -// queue serialization is handled through yaml struct tags or the actions unmarshaller defined in fleetapi -// TODO clean up action serialization (have it be part of the fleetapi?) -type stateSerializer struct { - Action *actionSerializer `json:"action,omitempty"` - AckToken string `json:"ack_token,omitempty"` - Queue fleetapi.Actions `json:"action_queue,omitempty"` +func (as *actionSerializer) UnmarshalJSON(data []byte) error { + var typeUnmarshaler struct { + Type string `json:"type,omitempty" yaml:"type,omitempty"` + } + err := json.Unmarshal(data, &typeUnmarshaler) + if err != nil { + return err + } + + as.Action = fleetapi.NewAction(typeUnmarshaler.Type) + err = json.Unmarshal(data, &as.Action) + if err != nil { + return err + } + + return nil +} + +func (aq *actionQueue) UnmarshalJSON(data []byte) error { + actions := fleetapi.Actions{} + err := json.Unmarshal(data, &actions) + if err != nil { + return fmt.Errorf("actionQueue failed to unmarshal: %w", err) + } + + var scheduledActions []fleetapi.ScheduledAction + for _, a := range actions { + sa, ok := a.(fleetapi.ScheduledAction) + if !ok { + return fmt.Errorf("actionQueue: action %s isn't a ScheduledAction,"+ + "cannot unmarshal it to actionQueue", a.Type()) + } + scheduledActions = append(scheduledActions, sa) + } + + *aq = scheduledActions + return nil } // NewStateStoreWithMigration creates a new state store and migrates the old one. func NewStateStoreWithMigration(ctx context.Context, log *logger.Logger, actionStorePath, stateStorePath string) (*StateStore, error) { stateDiskStore := storage.NewEncryptedDiskStore(ctx, stateStorePath) - err := migrateStateStore(log, actionStorePath, stateDiskStore) + err := migrateActionStoreToStateStore(log, actionStorePath, stateDiskStore) if err != nil { return nil, err } @@ -98,7 +131,7 @@ func NewStateStoreActionAcker(acker acker.Acker, store *StateStore) *StateStoreA } // NewStateStore creates a new state store. -func NewStateStore(log *logger.Logger, store storeLoad) (*StateStore, error) { +func NewStateStore(log *logger.Logger, store saveLoader) (*StateStore, error) { // If the store exists we will read it, if an error is returned we log it // and return an empty store. reader, err := store.Load() @@ -108,46 +141,24 @@ func NewStateStore(log *logger.Logger, store storeLoad) (*StateStore, error) { } defer reader.Close() - var serializer stateSerializer - + st := state{} dec := json.NewDecoder(reader) - err = dec.Decode(&serializer) + err = dec.Decode(&st) if errors.Is(err, io.EOF) { return &StateStore{ log: log, store: store, + state: state{Version: Version}, }, nil } - if err != nil { - return nil, err + return nil, fmt.Errorf("could not JSON unmarshal state store: %w", err) } - st := state{ - ackToken: serializer.AckToken, - queue: serializer.Queue, - } - - if serializer.Action != nil { - // TODO: use ActionType instead - if serializer.Action.IsDetected != nil { - st.action = &fleetapi.ActionUnenroll{ - ActionID: serializer.Action.ID, - ActionType: serializer.Action.Type, - IsDetected: *serializer.Action.IsDetected, - Signed: serializer.Action.Signed, - } - } else { - st.action = &fleetapi.ActionPolicyChange{ - ActionID: serializer.Action.ID, - ActionType: serializer.Action.Type, - Data: fleetapi.ActionPolicyChangeData{ - // Fix Policy, in order to make it consistent with the policy - // received from the fleet gateway as nested map[string]interface{} - Policy: conv.YAMLMapToJSONMap(serializer.Action.Data.Policy), - }, - } - } + if st.Version != Version { + return nil, fmt.Errorf( + "invalid state store version, got %q isntead of %s", + st.Version, Version) } return &StateStore{ @@ -157,94 +168,21 @@ func NewStateStore(log *logger.Logger, store storeLoad) (*StateStore, error) { }, nil } -func migrateStateStore( - log *logger.Logger, - actionStorePath string, - stateDiskStore storage.Storage) (err error) { - - log = log.Named("state_migration") - actionDiskStore := storage.NewDiskStore(actionStorePath) - - stateStoreExits, err := stateDiskStore.Exists() - if err != nil { - log.Errorf("failed to check if state store exists: %v", err) - return err - } - - // do not migrate if the state store already exists - if stateStoreExits { - log.Debugf("state store already exists") - return nil - } - - actionStoreExits, err := actionDiskStore.Exists() - if err != nil { - log.Errorf("failed to check if action store %s exists: %v", actionStorePath, err) - return err - } - - // delete the actions store file upon successful migration - defer func() { - if err == nil && actionStoreExits { - err = actionDiskStore.Delete() - if err != nil { - log.Errorf("failed to delete action store %s exists: %v", actionStorePath, err) - } - } - }() - - // nothing to migrate if the action store doesn't exists - if !actionStoreExits { - log.Debugf("action store %s doesn't exists, nothing to migrate", actionStorePath) - return nil - } - - actionStore, err := newActionStore(log, actionDiskStore) - if err != nil { - log.Errorf("failed to create action store %s: %v", actionStorePath, err) - return err - } - - // no actions stored nothing to migrate - if len(actionStore.actions()) == 0 { - log.Debugf("no actions stored in the action store %s, nothing to migrate", actionStorePath) - return nil - } - - stateStore, err := NewStateStore(log, stateDiskStore) - if err != nil { - return err - } - - // set actions from the action store to the state store - stateStore.Add(actionStore.actions()[0]) - - err = stateStore.Save() - if err != nil { - log.Debugf("failed to save agent state store, err: %v", err) - } - return err -} - -// Add is only taking care of ActionPolicyChange for now and will only keep the last one it receive, -// any other type of action will be silently ignored. -// TODO: fix docs: state: -// - the valid actions, -// - it silently discard invalid actions -// - perhaps rename it as it does not add to the queue, but sets the current -// action -func (s *StateStore) Add(a action) { +// SetAction sets the current action. It accepts ActionPolicyChange or +// ActionUnenroll. Any other type will be silently discarded. +func (s *StateStore) SetAction(a fleetapi.Action) { s.mx.Lock() defer s.mx.Unlock() switch v := a.(type) { case *fleetapi.ActionPolicyChange, *fleetapi.ActionUnenroll: // Only persist the action if the action is different. - if s.state.action != nil && s.state.action.ID() == v.ID() { + if s.state.ActionSerializer.Action != nil && + s.state.ActionSerializer.Action.ID() == v.ID() { return } s.dirty = true - s.state.action = a + s.state.ActionSerializer.Action = a } } @@ -253,20 +191,21 @@ func (s *StateStore) SetAckToken(ackToken string) { s.mx.Lock() defer s.mx.Unlock() - if s.state.ackToken == ackToken { + if s.state.AckToken == ackToken { return } s.dirty = true - s.state.ackToken = ackToken + s.state.AckToken = ackToken } // SetQueue sets the action_queue to agent state -func (s *StateStore) SetQueue(q []action) { +// TODO: receive only scheduled actions. It might break something. Needs to +// investigate it better. +func (s *StateStore) SetQueue(q []fleetapi.ScheduledAction) { s.mx.Lock() defer s.mx.Unlock() - s.state.queue = q + s.state.Queue = q s.dirty = true - } // Save saves the actions into a state store. @@ -280,34 +219,18 @@ func (s *StateStore) Save() error { } var reader io.Reader - serialize := stateSerializer{ - AckToken: s.state.ackToken, - Queue: s.state.queue, - } - if s.state.action != nil { - switch a := s.state.action.(type) { - case *fleetapi.ActionPolicyChange: - serialize.Action = &actionSerializer{ - ID: a.ActionID, - Type: a.ActionType, - Data: actionDataSerializer{ - Policy: a.Data.Policy, - }} - case *fleetapi.ActionUnenroll: - serialize.Action = &actionSerializer{ - ID: a.ActionID, - Type: a.ActionType, - IsDetected: &a.IsDetected, - Signed: a.Signed, - } - default: - return fmt.Errorf("incompatible type, expected ActionPolicyChange "+ - "or ActionUnenroll but received %T", s.state.action) - } + switch a := s.state.ActionSerializer.Action.(type) { + case *fleetapi.ActionPolicyChange, + *fleetapi.ActionUnenroll, + nil: + // ok + default: + return fmt.Errorf("incompatible type, expected ActionPolicyChange, "+ + "ActionUnenroll or nil, but received %T", a) } - reader, err := jsonToReader(&serialize) + reader, err := jsonToReader(&s.state) if err != nil { return err } @@ -320,49 +243,49 @@ func (s *StateStore) Save() error { } // Queue returns a copy of the queue -func (s *StateStore) Queue() []action { +func (s *StateStore) Queue() []fleetapi.ScheduledAction { s.mx.RLock() defer s.mx.RUnlock() - q := make([]action, len(s.state.queue)) - copy(q, s.state.queue) + q := make([]fleetapi.ScheduledAction, len(s.state.Queue)) + copy(q, s.state.Queue) return q } -// Actions returns a slice of action to execute in order, currently only a action policy change is -// persisted. -func (s *StateStore) Actions() []action { +// Action the action to execute. See SetAction for the possible action types. +func (s *StateStore) Action() fleetapi.Action { s.mx.RLock() defer s.mx.RUnlock() - if s.state.action == nil { - return []action{} + if s.state.ActionSerializer.Action == nil { + return nil } - return []action{s.state.action} + return s.state.ActionSerializer.Action } // AckToken return the agent state persisted ack_token func (s *StateStore) AckToken() string { s.mx.RLock() defer s.mx.RUnlock() - return s.state.ackToken + return s.state.AckToken } -// StateStoreActionAcker wraps an existing acker and will send any acked event to the action store, -// its up to the action store to decide if we need to persist the event for future replay or just -// discard the event. +// StateStoreActionAcker wraps an existing acker and will set any acked event +// in the state store. It's up to the state store to decide if we need to +// persist the event for future replay or just discard the event. type StateStoreActionAcker struct { acker acker.Acker store *StateStore } -// Ack acks action using underlying acker. -// After action is acked it is stored to backing store. +// Ack acks the action using underlying acker. +// After the action is acked it is stored in the StateStore. The StateStore +// decides if the action needs to be persisted or not. func (a *StateStoreActionAcker) Ack(ctx context.Context, action fleetapi.Action) error { if err := a.acker.Ack(ctx, action); err != nil { return err } - a.store.Add(action) + a.store.SetAction(action) return a.store.Save() } @@ -371,10 +294,79 @@ func (a *StateStoreActionAcker) Commit(ctx context.Context) error { return a.acker.Commit(ctx) } +func migrateActionStoreToStateStore( + log *logger.Logger, + actionStorePath string, + stateDiskStore storage.Storage) (err error) { + + log = log.Named("state_migration") + actionDiskStore := storage.NewDiskStore(actionStorePath) + + stateStoreExits, err := stateDiskStore.Exists() + if err != nil { + log.Errorf("failed to check if state store exists: %v", err) + return err + } + + // do not migrate if the state store already exists + if stateStoreExits { + log.Debugf("state store already exists") + return nil + } + + actionStoreExits, err := actionDiskStore.Exists() + if err != nil { + log.Errorf("failed to check if action store %s exists: %v", actionStorePath, err) + return err + } + + // delete the actions store file upon successful migration + defer func() { + if err == nil && actionStoreExits { + err = actionDiskStore.Delete() + if err != nil { + log.Errorf("failed to delete action store %s exists: %v", actionStorePath, err) + } + } + }() + + // nothing to migrate if the action store doesn't exists + if !actionStoreExits { + log.Debugf("action store %s doesn't exists, nothing to migrate", actionStorePath) + return nil + } + + actionStore, err := newActionStore(log, actionDiskStore) + if err != nil { + log.Errorf("failed to create action store %s: %v", actionStorePath, err) + return err + } + + // no actions stored nothing to migrate + if len(actionStore.actions()) == 0 { + log.Debugf("no actions stored in the action store %s, nothing to migrate", actionStorePath) + return nil + } + + stateStore, err := NewStateStore(log, stateDiskStore) + if err != nil { + return err + } + + // set actions from the action store to the state store + stateStore.SetAction(actionStore.actions()[0]) + + err = stateStore.Save() + if err != nil { + log.Debugf("failed to save agent state store, err: %v", err) + } + return err +} + func jsonToReader(in interface{}) (io.Reader, error) { data, err := json.Marshal(in) if err != nil { - return nil, fmt.Errorf("could not marshal to YAML: %w", err) + return nil, fmt.Errorf("could not marshal to JSON: %w", err) } return bytes.NewReader(data), nil } diff --git a/internal/pkg/agent/storage/store/state_store_test.go b/internal/pkg/agent/storage/store/state_store_test.go index 0b5a07c5de3..2ffd0746af9 100644 --- a/internal/pkg/agent/storage/store/state_store_test.go +++ b/internal/pkg/agent/storage/store/state_store_test.go @@ -46,7 +46,7 @@ func runTestStateStore(t *testing.T, ackToken string) { s := storage.NewDiskStore(storePath) store, err := NewStateStore(log, s) require.NoError(t, err) - require.Empty(t, store.Actions()) + require.Empty(t, store.Action()) require.Empty(t, store.Queue()) }) @@ -60,12 +60,12 @@ func runTestStateStore(t *testing.T, ackToken string) { store, err := NewStateStore(log, s) require.NoError(t, err) - require.Equal(t, 0, len(store.Actions())) - store.Add(actionPolicyChange) + require.Nil(t, store.Action()) + store.SetAction(actionPolicyChange) store.SetAckToken(ackToken) err = store.Save() require.NoError(t, err) - require.Empty(t, store.Actions()) + require.Empty(t, store.Action()) require.Empty(t, store.Queue()) require.Equal(t, ackToken, store.AckToken()) }) @@ -85,13 +85,13 @@ func runTestStateStore(t *testing.T, ackToken string) { store, err := NewStateStore(log, s) require.NoError(t, err) - require.Empty(t, store.Actions()) + require.Empty(t, store.Action()) require.Empty(t, store.Queue()) - store.Add(ActionPolicyChange) + store.SetAction(ActionPolicyChange) store.SetAckToken(ackToken) err = store.Save() require.NoError(t, err) - require.Len(t, store.Actions(), 1) + require.NotNil(t, store.Action(), "store should have an action stored") require.Empty(t, store.Queue()) require.Equal(t, ackToken, store.AckToken()) @@ -99,17 +99,17 @@ func runTestStateStore(t *testing.T, ackToken string) { store1, err := NewStateStore(log, s) require.NoError(t, err) - actions := store1.Actions() - require.Len(t, actions, 1) + action := store1.Action() + require.NotNil(t, action, "store should have an action stored") require.Empty(t, store1.Queue()) - require.Equal(t, ActionPolicyChange, actions[0]) + require.Equal(t, ActionPolicyChange, action) require.Equal(t, ackToken, store.AckToken()) }) t.Run("can save a queue with one upgrade action", func(t *testing.T) { ts := time.Now().UTC().Round(time.Second) - queue := []action{&fleetapi.ActionUpgrade{ + queue := []fleetapi.ScheduledAction{&fleetapi.ActionUpgrade{ ActionID: "test", ActionType: fleetapi.ActionTypeUpgrade, ActionStartTime: ts.Format(time.RFC3339), @@ -123,29 +123,29 @@ func runTestStateStore(t *testing.T, ackToken string) { store, err := NewStateStore(log, s) require.NoError(t, err) - require.Empty(t, store.Actions()) + require.Empty(t, store.Action()) store.SetQueue(queue) err = store.Save() require.NoError(t, err) - require.Empty(t, store.Actions()) + require.Empty(t, store.Action()) require.Len(t, store.Queue(), 1) s = storage.NewDiskStore(storePath) - store1, err := NewStateStore(log, s) - require.NoError(t, err) - require.Empty(t, store1.Actions()) - require.Len(t, store1.Queue(), 1) - require.Equal(t, "test", store1.Queue()[0].ID()) - scheduledAction, ok := store1.Queue()[0].(fleetapi.ScheduledAction) - require.True(t, ok, "expected to be able to cast Action as ScheduledAction") - start, err := scheduledAction.StartTime() + store, err = NewStateStore(log, s) require.NoError(t, err) - require.Equal(t, ts, start) + + assert.Nil(t, store.Action()) + assert.Len(t, store.Queue(), 1) + assert.Equal(t, "test", store.Queue()[0].ID()) + + start, err := store.Queue()[0].StartTime() + assert.NoError(t, err) + assert.Equal(t, ts, start) }) t.Run("can save a queue with two actions", func(t *testing.T) { ts := time.Now().UTC().Round(time.Second) - queue := []action{&fleetapi.ActionUpgrade{ + queue := []fleetapi.ScheduledAction{&fleetapi.ActionUpgrade{ ActionID: "test", ActionType: fleetapi.ActionTypeUpgrade, ActionStartTime: ts.Format(time.RFC3339), @@ -153,50 +153,49 @@ func runTestStateStore(t *testing.T, ackToken string) { Version: "1.2.3", SourceURI: "https://example.com", Retry: 1, - }}, &fleetapi.ActionPolicyChange{ - ActionID: "abc123", - ActionType: "POLICY_CHANGE", - Data: fleetapi.ActionPolicyChangeData{ - Policy: map[string]interface{}{ - "hello": "world", - }}, - }} + }}, + // only the latest upgrade action is kept, however it's not the store + // which handled that. Besides upgrade actions are the only + // ScheduledAction right now, so it'll use 2 of them for this test. + &fleetapi.ActionUpgrade{ + ActionID: "test2", + ActionType: fleetapi.ActionTypeUpgrade, + ActionStartTime: ts.Format(time.RFC3339), + Data: fleetapi.ActionUpgradeData{ + Version: "1.2.4", + SourceURI: "https://example.com", + Retry: 1, + }}} storePath := filepath.Join(t.TempDir(), "state.yml") s := storage.NewDiskStore(storePath) store, err := NewStateStore(log, s) require.NoError(t, err) - require.Empty(t, store.Actions()) + require.Empty(t, store.Action()) store.SetQueue(queue) err = store.Save() require.NoError(t, err) - require.Empty(t, store.Actions()) + require.Empty(t, store.Action()) require.Len(t, store.Queue(), 2) + // Load state store from disk s = storage.NewDiskStore(storePath) - store1, err := NewStateStore(log, s) - require.NoError(t, err) - require.Empty(t, store1.Actions()) - require.Len(t, store1.Queue(), 2) + store, err = NewStateStore(log, s) + require.NoError(t, err, "could not load store from disk") - require.Equal(t, "test", store1.Queue()[0].ID()) - scheduledAction, ok := store1.Queue()[0].(fleetapi.ScheduledAction) - require.True(t, ok, "expected to be able to cast Action as ScheduledAction") - start, err := scheduledAction.StartTime() - require.NoError(t, err) - require.Equal(t, ts, start) - retryableAction, ok := store1.Queue()[0].(fleetapi.RetryableAction) - require.True(t, ok, "expected to be able to cast Action as RetryableAction") - require.Equal(t, 1, retryableAction.RetryAttempt()) - - require.Equal(t, "abc123", store1.Queue()[1].ID()) - _, ok = store1.Queue()[1].(fleetapi.ScheduledAction) - require.False(t, ok, "expected cast to ScheduledAction to fail") + got := store.Queue() + for i, want := range queue { + upgradeAction, ok := got[i].(*fleetapi.ActionUpgrade) + assert.True(t, ok, + "expected to be able to cast Action as ActionUpgrade") + + assert.Equal(t, want, upgradeAction, "saved action is different from expected") + } }) t.Run("can save to disk unenroll action type", func(t *testing.T) { - action := &fleetapi.ActionUnenroll{ + want := &fleetapi.ActionUnenroll{ ActionID: "abc123", ActionType: "UNENROLL", } @@ -206,13 +205,13 @@ func runTestStateStore(t *testing.T, ackToken string) { store, err := NewStateStore(log, s) require.NoError(t, err) - require.Empty(t, store.Actions()) + require.Empty(t, store.Action()) require.Empty(t, store.Queue()) - store.Add(action) + store.SetAction(want) store.SetAckToken(ackToken) err = store.Save() require.NoError(t, err) - require.Len(t, store.Actions(), 1) + require.NotNil(t, store.Action(), "store should have an action stored") require.Empty(t, store.Queue()) require.Equal(t, ackToken, store.AckToken()) @@ -220,10 +219,10 @@ func runTestStateStore(t *testing.T, ackToken string) { store1, err := NewStateStore(log, s) require.NoError(t, err) - actions := store1.Actions() - require.Len(t, actions, 1) + got := store1.Action() + require.NotNil(t, got, "store should have an action stored") require.Empty(t, store1.Queue()) - require.Equal(t, action, actions[0]) + require.Equal(t, want, got) require.Equal(t, ackToken, store.AckToken()) }) @@ -239,10 +238,10 @@ func runTestStateStore(t *testing.T, ackToken string) { store.SetAckToken(ackToken) acker := NewStateStoreActionAcker(&testAcker{}, store) - require.Empty(t, store.Actions()) + require.Empty(t, store.Action()) require.NoError(t, acker.Ack(context.Background(), ActionPolicyChange)) - require.Len(t, store.Actions(), 1) + require.NotNil(t, store.Action(), "store should have an action stored") require.Empty(t, store.Queue()) require.Equal(t, ackToken, store.AckToken()) }) @@ -260,7 +259,7 @@ func runTestStateStore(t *testing.T, ackToken string) { newStateStorePath := filepath.Join(tempDir, "state_store.yml") newStateStore := storage.NewEncryptedDiskStore(ctx, newStateStorePath) - err := migrateStateStore(log, oldActionStorePath, newStateStore) + err := migrateActionStoreToStateStore(log, oldActionStorePath, newStateStore) require.NoError(t, err, "migration action store -> state store failed") // to load from disk a new store needs to be created, it loads the file @@ -268,7 +267,7 @@ func runTestStateStore(t *testing.T, ackToken string) { stateStore, err := NewStateStore(log, storage.NewDiskStore(newStateStorePath)) require.NoError(t, err) stateStore.SetAckToken(ackToken) - require.Empty(t, stateStore.Actions()) + require.Empty(t, stateStore.Action()) require.Equal(t, ackToken, stateStore.AckToken()) require.Empty(t, stateStore.Queue()) }) @@ -329,7 +328,7 @@ func runTestStateStore(t *testing.T, ackToken string) { newStateStorePath := filepath.Join(tempDir, "state_store.yaml") newStateStore := storage.NewEncryptedDiskStore(ctx, newStateStorePath, storage.WithVaultPath(vaultPath)) - err = migrateStateStore(log, oldActionStorePath, newStateStore) + err = migrateActionStoreToStateStore(log, oldActionStorePath, newStateStore) require.NoError(t, err, "migration action store -> state store failed") // to load from disk a new store needs to be created, it loads the file @@ -339,9 +338,8 @@ func runTestStateStore(t *testing.T, ackToken string) { stateStore, err := NewStateStore(log, newStateStore) require.NoError(t, err, "could not create state store") - actions := stateStore.Actions() - require.Len(t, actions, 1, "state store should load exactly 1 action") - got := actions[0] + got := stateStore.Action() + require.NotNil(t, got, "should have loaded an action") assert.Equalf(t, want, got, "loaded action differs from action on the old action store") @@ -369,7 +367,7 @@ func runTestStateStore(t *testing.T, ackToken string) { require.NoError(t, err, "could not create disk store") stateStore.SetAckToken(ackToken) - stateStore.Add(want) + stateStore.SetAction(want) err = stateStore.Save() require.NoError(t, err, "failed saving state store") @@ -378,10 +376,9 @@ func runTestStateStore(t *testing.T, ackToken string) { stateStore, err = NewStateStore(log, s) require.NoError(t, err, "could not create disk store") - actions := stateStore.Actions() - require.Len(t, actions, 1, - "should have loaded exactly 1 action") - got, ok := actions[0].(*fleetapi.ActionPolicyChange) + action := stateStore.Action() + require.NotNil(t, action, "should have loaded an action") + got, ok := action.(*fleetapi.ActionPolicyChange) require.True(t, ok, "could not cast action to fleetapi.ActionPolicyChange") assert.Equal(t, want, got) @@ -412,7 +409,7 @@ func runTestStateStore(t *testing.T, ackToken string) { require.NoError(t, err, "could not create disk store") stateStore.SetAckToken(ackToken) - stateStore.Add(want) + stateStore.SetAction(want) err = stateStore.Save() require.NoError(t, err, "failed saving state store") @@ -421,10 +418,9 @@ func runTestStateStore(t *testing.T, ackToken string) { stateStore, err = NewStateStore(log, s) require.NoError(t, err, "could not create disk store") - actions := stateStore.Actions() - require.Len(t, actions, 1, - "should have loaded exactly 1 action") - got, ok := actions[0].(*fleetapi.ActionUnenroll) + action := stateStore.Action() + require.NotNil(t, action, "should have loaded an action") + got, ok := action.(*fleetapi.ActionUnenroll) require.True(t, ok, "could not cast action to fleetapi.ActionUnenroll") assert.Equal(t, want, got) @@ -463,7 +459,7 @@ func runTestStateStore(t *testing.T, ackToken string) { require.NoError(t, err, "could not create disk store") stateStore.SetAckToken(ackToken) - stateStore.SetQueue([]action{want}) + stateStore.SetQueue([]fleetapi.ScheduledAction{want}) err = stateStore.Save() require.NoError(t, err, "failed saving state store") diff --git a/internal/pkg/config/operations/inspector.go b/internal/pkg/config/operations/inspector.go index ef4ab0ab32b..f43b4fd9488 100644 --- a/internal/pkg/config/operations/inspector.go +++ b/internal/pkg/config/operations/inspector.go @@ -113,18 +113,16 @@ func loadConfig(ctx context.Context, configPath string) (*config.Config, error) } func loadFleetConfig(ctx context.Context, l *logger.Logger) (map[string]interface{}, error) { - stateStore, err := store.NewStateStoreWithMigration(ctx, l, paths.AgentActionStoreFile(), paths.AgentStateStoreFile()) + stateStore, err := store.NewStateStoreWithMigration( + ctx, l, paths.AgentActionStoreFile(), paths.AgentStateStoreFile()) if err != nil { return nil, err } - for _, c := range stateStore.Actions() { - cfgChange, ok := c.(*fleetapi.ActionPolicyChange) - if !ok { - continue - } - + cfgChange, ok := stateStore.Action().(*fleetapi.ActionPolicyChange) + if ok { return cfgChange.Data.Policy, nil } + return nil, nil } diff --git a/internal/pkg/fleetapi/action.go b/internal/pkg/fleetapi/action.go index a7d4fdfb82f..1065dca7913 100644 --- a/internal/pkg/fleetapi/action.go +++ b/internal/pkg/fleetapi/action.go @@ -50,6 +50,10 @@ type Action interface { AckEvent() AckEvent } +// Actions is a slice of Actions to executes and allow to unmarshal +// heterogeneous action types. +type Actions []Action + // ScheduledAction is an Action that may be executed at a later date // Only ActionUpgrade implements this at the moment type ScheduledAction interface { @@ -87,6 +91,38 @@ type Signed struct { Signature string `json:"signature" yaml:"signature" mapstructure:"signature"` } +// NewAction returns a new, zero-value, action of the type defined by 'actionType' +// or an ActionUnknown with the 'OriginalType' field set to 'actionType' if the +// type is not valid. +func NewAction(actionType string) Action { + var action Action + + // keep the case statements alphabetically sorted + switch actionType { + case ActionTypeCancel: + action = &ActionCancel{} + case ActionTypeDiagnostics: + action = &ActionDiagnostics{} + case ActionTypeInputAction: + // Only INPUT_ACTION type actions could possibly be signed https://github.com/elastic/elastic-agent/pull/2348 + action = &ActionApp{} + case ActionTypePolicyChange: + action = &ActionPolicyChange{} + case ActionTypePolicyReassign: + action = &ActionPolicyReassign{} + case ActionTypeSettings: + action = &ActionSettings{} + case ActionTypeUnenroll: + action = &ActionUnenroll{} + case ActionTypeUpgrade: + action = &ActionUpgrade{} + default: + action = &ActionUnknown{OriginalType: actionType} + } + + return action +} + func newAckEvent(id, aType string) AckEvent { return AckEvent{ EventType: "ACTION_RESULT", @@ -535,9 +571,6 @@ func (a *ActionApp) MarshalMap() (map[string]interface{}, error) { return res, err } -// Actions is a list of Actions to executes and allow to unmarshal heterogenous action type. -type Actions []Action - // UnmarshalJSON takes every raw representation of an action and try to decode them. func (a *Actions) UnmarshalJSON(data []byte) error { var typeUnmarshaler []struct { @@ -559,30 +592,7 @@ func (a *Actions) UnmarshalJSON(data []byte) error { actions := make([]Action, 0, len(typeUnmarshaler)) for i, response := range typeUnmarshaler { - var action Action - - // keep the case statements alphabetically sorted - switch response.ActionType { - case ActionTypeCancel: - action = &ActionCancel{} - case ActionTypeDiagnostics: - action = &ActionDiagnostics{} - case ActionTypeInputAction: - // Only INPUT_ACTION type actions could possibly be signed https://github.com/elastic/elastic-agent/pull/2348 - action = &ActionApp{} - case ActionTypePolicyChange: - action = &ActionPolicyChange{} - case ActionTypePolicyReassign: - action = &ActionPolicyReassign{} - case ActionTypeSettings: - action = &ActionSettings{} - case ActionTypeUnenroll: - action = &ActionUnenroll{} - case ActionTypeUpgrade: - action = &ActionUpgrade{} - default: - action = &ActionUnknown{OriginalType: response.ActionType} - } + action := NewAction(response.ActionType) if err := json.Unmarshal(rawActions[i], action); err != nil { return errors.New(err, diff --git a/internal/pkg/queue/actionqueue.go b/internal/pkg/queue/actionqueue.go index b0cdc127dff..da2c84aa8c9 100644 --- a/internal/pkg/queue/actionqueue.go +++ b/internal/pkg/queue/actionqueue.go @@ -11,9 +11,9 @@ import ( "github.com/elastic/elastic-agent/internal/pkg/fleetapi" ) -// saver is an the minimal interface needed for state storage. +// saver is the minimal interface needed for state storage. type saver interface { - SetQueue(a []fleetapi.Action) + SetQueue(a []fleetapi.ScheduledAction) Save() error } @@ -74,13 +74,9 @@ func (q *queue) Pop() interface{} { // newQueue creates a new priority queue using container/heap. // Will return an error if StartTime fails for any action. -func newQueue(actions []fleetapi.Action) (*queue, error) { +func newQueue(actions []fleetapi.ScheduledAction) (*queue, error) { q := make(queue, len(actions)) - for i, a := range actions { - action, ok := a.(fleetapi.ScheduledAction) - if !ok { - continue - } + for i, action := range actions { ts, err := action.StartTime() if err != nil { return nil, err @@ -95,8 +91,8 @@ func newQueue(actions []fleetapi.Action) (*queue, error) { return &q, nil } -// NewActionQueue creates a new queue with the passed actions using the persistor for state storage. -func NewActionQueue(actions []fleetapi.Action, s saver) (*ActionQueue, error) { +// NewActionQueue creates a new queue with the passed actions using the saver for state storage. +func NewActionQueue(actions []fleetapi.ScheduledAction, s saver) (*ActionQueue, error) { q, err := newQueue(actions) if err != nil { return nil, err @@ -149,8 +145,8 @@ func (q *ActionQueue) Cancel(actionID string) int { } // Actions returns all actions in the queue, item 0 is garunteed to be the min, the rest may not be in sorted order. -func (q *ActionQueue) Actions() []fleetapi.Action { - actions := make([]fleetapi.Action, q.q.Len()) +func (q *ActionQueue) Actions() []fleetapi.ScheduledAction { + actions := make([]fleetapi.ScheduledAction, q.q.Len()) for i, item := range *q.q { actions[i] = item.action } diff --git a/internal/pkg/queue/actionqueue_test.go b/internal/pkg/queue/actionqueue_test.go index d5a7a5c41d4..df6b5a3d1c4 100644 --- a/internal/pkg/queue/actionqueue_test.go +++ b/internal/pkg/queue/actionqueue_test.go @@ -56,7 +56,7 @@ type mockSaver struct { mock.Mock } -func (m *mockSaver) SetQueue(a []fleetapi.Action) { +func (m *mockSaver) SetQueue(a []fleetapi.ScheduledAction) { m.Called(a) } @@ -85,14 +85,14 @@ func TestNewQueue(t *testing.T) { }) t.Run("empty actions slice", func(t *testing.T) { - q, err := newQueue([]fleetapi.Action{}) + q, err := newQueue([]fleetapi.ScheduledAction{}) require.NoError(t, err) assert.NotNil(t, q) assert.Empty(t, q) }) t.Run("ordered actions list", func(t *testing.T) { - q, err := newQueue([]fleetapi.Action{a1, a2, a3}) + q, err := newQueue([]fleetapi.ScheduledAction{a1, a2, a3}) assert.NotNil(t, q) require.NoError(t, err) assert.Len(t, *q, 3) @@ -107,7 +107,7 @@ func TestNewQueue(t *testing.T) { }) t.Run("unordered actions list", func(t *testing.T) { - q, err := newQueue([]fleetapi.Action{a3, a2, a1}) + q, err := newQueue([]fleetapi.ScheduledAction{a3, a2, a1}) require.NoError(t, err) assert.NotNil(t, q) assert.Len(t, *q, 3) @@ -124,7 +124,7 @@ func TestNewQueue(t *testing.T) { t.Run("start time error", func(t *testing.T) { a := &mockAction{} a.On("StartTime").Return(time.Time{}, errors.New("oh no")) - q, err := newQueue([]fleetapi.Action{a}) + q, err := newQueue([]fleetapi.ScheduledAction{a}) assert.EqualError(t, err, "oh no") assert.Nil(t, q) })