Skip to content

Commit

Permalink
[Ingest Manager] Send updating state (elastic#21461)
Browse files Browse the repository at this point in the history
[Ingest Manager] Send updating state (elastic#21461)
  • Loading branch information
michalpristas authored Oct 5, 2020
1 parent 641d02f commit 84f6311
Show file tree
Hide file tree
Showing 5 changed files with 81 additions and 9 deletions.
1 change: 1 addition & 0 deletions x-pack/elastic-agent/CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,4 @@
- Add support for EQL based condition on inputs {pull}20994[20994]
- Send `fleet.host.id` to Endpoint Security {pull}21042[21042]
- Add `install` and `uninstall` subcommands {pull}21206[21206]
- Send updating state {pull}21461[21461]
4 changes: 3 additions & 1 deletion x-pack/elastic-agent/pkg/agent/application/managed_mode.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,11 +200,13 @@ func newManaged(
}

managedApplication.upgrader = upgrade.NewUpgrader(
agentInfo,
cfg.Settings.DownloadConfig,
log,
[]context.CancelFunc{managedApplication.cancelCtxFn},
reexec,
acker)
acker,
combinedReporter)

actionDispatcher.MustRegister(
&fleetapi.ActionPolicyChange{},
Expand Down
77 changes: 69 additions & 8 deletions x-pack/elastic-agent/pkg/agent/application/upgrade/upgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/install"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/artifact"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/state"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/fleetapi"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/release"
)
Expand All @@ -33,11 +34,13 @@ const (

// Upgrader performs an upgrade
type Upgrader struct {
agentInfo *info.AgentInfo
settings *artifact.Config
log *logger.Logger
closers []context.CancelFunc
reexec reexecManager
acker acker
reporter stateReporter
upgradeable bool
}

Expand All @@ -50,14 +53,19 @@ type acker interface {
Commit(ctx context.Context) error
}

type stateReporter interface {
OnStateChange(id string, name string, s state.State)
}

// NewUpgrader creates an upgrader which is capable of performing upgrade operation
func NewUpgrader(settings *artifact.Config, log *logger.Logger, closers []context.CancelFunc, reexec reexecManager, a acker) *Upgrader {
func NewUpgrader(agentInfo *info.AgentInfo, settings *artifact.Config, log *logger.Logger, closers []context.CancelFunc, reexec reexecManager, a acker, r stateReporter) *Upgrader {
return &Upgrader{
settings: settings,
log: log,
closers: closers,
reexec: reexec,
acker: a,
reporter: r,
upgradeable: getUpgradable(),
}
}
Expand All @@ -68,13 +76,22 @@ func (u *Upgrader) Upgradeable() bool {
}

// Upgrade upgrades running agent
func (u *Upgrader) Upgrade(ctx context.Context, a *fleetapi.ActionUpgrade) error {
func (u *Upgrader) Upgrade(ctx context.Context, a *fleetapi.ActionUpgrade) (err error) {
// report failed
defer func() {
if err != nil {
u.reportFailure(ctx, a, err)
}
}()

if !u.upgradeable {
return fmt.Errorf(
"cannot be upgraded; must be installed with install sub-command and " +
"running under control of the systems supervisor")
}

u.reportUpdating(a.Version)

sourceURI, err := u.sourceURI(a.Version, a.SourceURI)
archivePath, err := u.downloadArtifact(ctx, a.Version, sourceURI)
if err != nil {
Expand All @@ -91,7 +108,10 @@ func (u *Upgrader) Upgrade(ctx context.Context, a *fleetapi.ActionUpgrade) error
}

if strings.HasPrefix(release.Commit(), newHash) {
return errors.New("upgrading to same version")
// not an error
u.ackAction(ctx, a)
u.log.Warn("upgrading to same version")
return nil
}

if err := copyActionStore(newHash); err != nil {
Expand Down Expand Up @@ -132,11 +152,7 @@ func (u *Upgrader) Ack(ctx context.Context) error {
return nil
}

if err := u.acker.Ack(ctx, marker.Action); err != nil {
return err
}

if err := u.acker.Commit(ctx); err != nil {
if err := u.ackAction(ctx, marker.Action); err != nil {
return err
}

Expand All @@ -148,6 +164,7 @@ func (u *Upgrader) Ack(ctx context.Context) error {

return ioutil.WriteFile(markerFile, markerBytes, 0600)
}

func (u *Upgrader) sourceURI(version, retrievedURI string) (string, error) {
if strings.HasSuffix(version, "-SNAPSHOT") && retrievedURI == "" {
return "", errors.New("snapshot upgrade requires source uri", errors.TypeConfig)
Expand All @@ -159,6 +176,50 @@ func (u *Upgrader) sourceURI(version, retrievedURI string) (string, error) {
return u.settings.SourceURI, nil
}

// ackAction is used for successful updates, it was either updated successfully or to the same version
// so we need to remove updating state and get prevent from receiving same update action again.
func (u *Upgrader) ackAction(ctx context.Context, action fleetapi.Action) error {
if err := u.acker.Ack(ctx, action); err != nil {
return err
}

if err := u.acker.Commit(ctx); err != nil {
return err
}

u.reporter.OnStateChange(
"",
agentName,
state.State{Status: state.Running},
)

return nil
}

// report failure is used when update process fails. action is acked so it won't be received again
// and state is changed to FAILED
func (u *Upgrader) reportFailure(ctx context.Context, action fleetapi.Action, err error) {
// ack action
u.acker.Ack(ctx, action)

// report failure
u.reporter.OnStateChange(
"",
agentName,
state.State{Status: state.Failed, Message: err.Error()},
)
}

// reportUpdating sets state of agent to updating.
func (u *Upgrader) reportUpdating(version string) {
// report failure
u.reporter.OnStateChange(
"",
agentName,
state.State{Status: state.Updating, Message: fmt.Sprintf("Update to version '%s' started", version)},
)
}

func rollbackInstall(hash string) {
os.RemoveAll(filepath.Join(paths.Data(), fmt.Sprintf("%s-%s", agentName, hash)))
}
Expand Down
2 changes: 2 additions & 0 deletions x-pack/elastic-agent/pkg/core/state/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ const (
Crashed
// Restarting is status describing application is restarting.
Restarting
// Updating is status describing application is updating.
Updating
)

// State wraps the process state and application status.
Expand Down
6 changes: 6 additions & 0 deletions x-pack/elastic-agent/pkg/reporter/reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ const (
EventSubTypeFailed = "FAILED"
// EventSubTypeStopping is an event type indicating application is stopping.
EventSubTypeStopping = "STOPPING"
// EventSubTypeUpdating is an event type indicating update process in progress.
EventSubTypeUpdating = "UPDATING"
)

type agentInfo interface {
Expand Down Expand Up @@ -127,6 +129,10 @@ func generateRecord(agentID string, id string, name string, s state.State) event
case state.Restarting:
subType = EventSubTypeStarting
subTypeText = "RESTARTING"
case state.Updating:
subType = EventSubTypeUpdating
subTypeText = EventSubTypeUpdating

}

err := errors.New(
Expand Down

0 comments on commit 84f6311

Please sign in to comment.