diff --git a/x-pack/elastic-agent/CHANGELOG.next.asciidoc b/x-pack/elastic-agent/CHANGELOG.next.asciidoc index 2ba08864ae85..278a9ea9cf45 100644 --- a/x-pack/elastic-agent/CHANGELOG.next.asciidoc +++ b/x-pack/elastic-agent/CHANGELOG.next.asciidoc @@ -28,3 +28,4 @@ - Add support for EQL based condition on inputs {pull}20994[20994] - Send `fleet.host.id` to Endpoint Security {pull}21042[21042] - Add `install` and `uninstall` subcommands {pull}21206[21206] +- Send updating state {pull}21461[21461] diff --git a/x-pack/elastic-agent/pkg/agent/application/managed_mode.go b/x-pack/elastic-agent/pkg/agent/application/managed_mode.go index 12a9c2427808..d1eaf197a886 100644 --- a/x-pack/elastic-agent/pkg/agent/application/managed_mode.go +++ b/x-pack/elastic-agent/pkg/agent/application/managed_mode.go @@ -200,11 +200,13 @@ func newManaged( } managedApplication.upgrader = upgrade.NewUpgrader( + agentInfo, cfg.Settings.DownloadConfig, log, []context.CancelFunc{managedApplication.cancelCtxFn}, reexec, - acker) + acker, + combinedReporter) actionDispatcher.MustRegister( &fleetapi.ActionPolicyChange{}, diff --git a/x-pack/elastic-agent/pkg/agent/application/upgrade/upgrade.go b/x-pack/elastic-agent/pkg/agent/application/upgrade/upgrade.go index cac36ef7922a..7aacf77ba634 100644 --- a/x-pack/elastic-agent/pkg/agent/application/upgrade/upgrade.go +++ b/x-pack/elastic-agent/pkg/agent/application/upgrade/upgrade.go @@ -20,6 +20,7 @@ import ( "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/install" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/artifact" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/state" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/fleetapi" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/release" ) @@ -33,11 +34,13 @@ const ( // Upgrader performs an upgrade type Upgrader struct { + agentInfo *info.AgentInfo settings *artifact.Config log *logger.Logger closers []context.CancelFunc reexec reexecManager acker acker + reporter stateReporter upgradeable bool } @@ -50,14 +53,19 @@ type acker interface { Commit(ctx context.Context) error } +type stateReporter interface { + OnStateChange(id string, name string, s state.State) +} + // NewUpgrader creates an upgrader which is capable of performing upgrade operation -func NewUpgrader(settings *artifact.Config, log *logger.Logger, closers []context.CancelFunc, reexec reexecManager, a acker) *Upgrader { +func NewUpgrader(agentInfo *info.AgentInfo, settings *artifact.Config, log *logger.Logger, closers []context.CancelFunc, reexec reexecManager, a acker, r stateReporter) *Upgrader { return &Upgrader{ settings: settings, log: log, closers: closers, reexec: reexec, acker: a, + reporter: r, upgradeable: getUpgradable(), } } @@ -68,13 +76,22 @@ func (u *Upgrader) Upgradeable() bool { } // Upgrade upgrades running agent -func (u *Upgrader) Upgrade(ctx context.Context, a *fleetapi.ActionUpgrade) error { +func (u *Upgrader) Upgrade(ctx context.Context, a *fleetapi.ActionUpgrade) (err error) { + // report failed + defer func() { + if err != nil { + u.reportFailure(ctx, a, err) + } + }() + if !u.upgradeable { return fmt.Errorf( "cannot be upgraded; must be installed with install sub-command and " + "running under control of the systems supervisor") } + u.reportUpdating(a.Version) + sourceURI, err := u.sourceURI(a.Version, a.SourceURI) archivePath, err := u.downloadArtifact(ctx, a.Version, sourceURI) if err != nil { @@ -91,7 +108,10 @@ func (u *Upgrader) Upgrade(ctx context.Context, a *fleetapi.ActionUpgrade) error } if strings.HasPrefix(release.Commit(), newHash) { - return errors.New("upgrading to same version") + // not an error + u.ackAction(ctx, a) + u.log.Warn("upgrading to same version") + return nil } if err := copyActionStore(newHash); err != nil { @@ -132,11 +152,7 @@ func (u *Upgrader) Ack(ctx context.Context) error { return nil } - if err := u.acker.Ack(ctx, marker.Action); err != nil { - return err - } - - if err := u.acker.Commit(ctx); err != nil { + if err := u.ackAction(ctx, marker.Action); err != nil { return err } @@ -148,6 +164,7 @@ func (u *Upgrader) Ack(ctx context.Context) error { return ioutil.WriteFile(markerFile, markerBytes, 0600) } + func (u *Upgrader) sourceURI(version, retrievedURI string) (string, error) { if strings.HasSuffix(version, "-SNAPSHOT") && retrievedURI == "" { return "", errors.New("snapshot upgrade requires source uri", errors.TypeConfig) @@ -159,6 +176,50 @@ func (u *Upgrader) sourceURI(version, retrievedURI string) (string, error) { return u.settings.SourceURI, nil } +// ackAction is used for successful updates, it was either updated successfully or to the same version +// so we need to remove updating state and get prevent from receiving same update action again. +func (u *Upgrader) ackAction(ctx context.Context, action fleetapi.Action) error { + if err := u.acker.Ack(ctx, action); err != nil { + return err + } + + if err := u.acker.Commit(ctx); err != nil { + return err + } + + u.reporter.OnStateChange( + "", + agentName, + state.State{Status: state.Running}, + ) + + return nil +} + +// report failure is used when update process fails. action is acked so it won't be received again +// and state is changed to FAILED +func (u *Upgrader) reportFailure(ctx context.Context, action fleetapi.Action, err error) { + // ack action + u.acker.Ack(ctx, action) + + // report failure + u.reporter.OnStateChange( + "", + agentName, + state.State{Status: state.Failed, Message: err.Error()}, + ) +} + +// reportUpdating sets state of agent to updating. +func (u *Upgrader) reportUpdating(version string) { + // report failure + u.reporter.OnStateChange( + "", + agentName, + state.State{Status: state.Updating, Message: fmt.Sprintf("Update to version '%s' started", version)}, + ) +} + func rollbackInstall(hash string) { os.RemoveAll(filepath.Join(paths.Data(), fmt.Sprintf("%s-%s", agentName, hash))) } diff --git a/x-pack/elastic-agent/pkg/core/state/state.go b/x-pack/elastic-agent/pkg/core/state/state.go index 6b7c8bd53dec..670cdc2a2f23 100644 --- a/x-pack/elastic-agent/pkg/core/state/state.go +++ b/x-pack/elastic-agent/pkg/core/state/state.go @@ -30,6 +30,8 @@ const ( Crashed // Restarting is status describing application is restarting. Restarting + // Updating is status describing application is updating. + Updating ) // State wraps the process state and application status. diff --git a/x-pack/elastic-agent/pkg/reporter/reporter.go b/x-pack/elastic-agent/pkg/reporter/reporter.go index c36708a837f7..3b128841b2a1 100644 --- a/x-pack/elastic-agent/pkg/reporter/reporter.go +++ b/x-pack/elastic-agent/pkg/reporter/reporter.go @@ -38,6 +38,8 @@ const ( EventSubTypeFailed = "FAILED" // EventSubTypeStopping is an event type indicating application is stopping. EventSubTypeStopping = "STOPPING" + // EventSubTypeUpdating is an event type indicating update process in progress. + EventSubTypeUpdating = "UPDATING" ) type agentInfo interface { @@ -127,6 +129,10 @@ func generateRecord(agentID string, id string, name string, s state.State) event case state.Restarting: subType = EventSubTypeStarting subTypeText = "RESTARTING" + case state.Updating: + subType = EventSubTypeUpdating + subTypeText = EventSubTypeUpdating + } err := errors.New(