Skip to content

Commit

Permalink
[Elastic Agent] Support an application spec to be ran through service…
Browse files Browse the repository at this point in the history
… manager (eg. Endpoint) (elastic#19205) (elastic#19487)

* Initial spec parsing for endpoint.

* Update comment.

* Fix spec test.

* Update code so it copies the entire input.

* Fix ast test.

* Merge agent-improve-restart-loop

* Merge agent-endpoint-spec

* Refactor core/plugin/app into mostly core/ and use core/plugin for different app types.

* Work on endpoint service application.

* More fixes.

* Fix format and tests.

* Fix some imports.

* More cleanups.

* Fix export comment.

* Pass the program.Spec into the descriptor.

* Fix some small issues with service app.

* Fix lint and tests.

* Remove the code no longer needed because of newer config format.

* Fix rules and review.

(cherry picked from commit ac51570)
  • Loading branch information
blakerouse authored Jun 29, 2020
1 parent 7a9e29d commit d831809
Show file tree
Hide file tree
Showing 60 changed files with 674 additions and 253 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ import (
"github.com/stretchr/testify/require"

"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/config"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/authority"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/plugin/authority"
)

type mockStore struct {
Expand Down
40 changes: 40 additions & 0 deletions x-pack/elastic-agent/pkg/agent/application/fleet_decorator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package application

import (
"fmt"

"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/transpiler"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/config"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger"
)

func injectFleet(cfg *config.Config) func(*logger.Logger, *transpiler.AST) error {
return func(logger *logger.Logger, rootAst *transpiler.AST) error {
config, err := cfg.ToMapStr()
if err != nil {
return err
}
ast, err := transpiler.NewAST(config)
if err != nil {
return err
}
api, ok := transpiler.Lookup(ast, "api")
if !ok {
return fmt.Errorf("failed to get api from fleet config")
}
agentInfo, ok := transpiler.Lookup(ast, "agent_info")
if !ok {
return fmt.Errorf("failed to get agent_info from fleet config")
}
fleet := transpiler.NewDict([]transpiler.Node{agentInfo, api})
err = transpiler.Insert(rootAst, fleet, "fleet")
if err != nil {
return err
}
return nil
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/program"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/config"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/plugin/app/monitoring/noop"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/monitoring/noop"
)

// IntrospectOutputCmd is an introspect subcommand that shows configurations of the agent.
Expand Down
6 changes: 3 additions & 3 deletions x-pack/elastic-agent/pkg/agent/application/local_mode.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@ import (
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/application/info"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/configrequest"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/errors"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/operation"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/config"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/plugin/app"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/plugin/app/monitoring"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/monitoring"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/server"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/dir"
reporting "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/reporter"
Expand Down Expand Up @@ -81,7 +81,7 @@ func newLocal(
}

localApplication.bgContext, localApplication.cancelCtxFn = context.WithCancel(ctx)
localApplication.srv, err = server.NewFromConfig(log, rawConfig, &app.ApplicationStatusHandler{})
localApplication.srv, err = server.NewFromConfig(log, rawConfig, &operation.ApplicationStatusHandler{})
if err != nil {
return nil, errors.New(err, "initialize GRPC listener")
}
Expand Down
9 changes: 4 additions & 5 deletions x-pack/elastic-agent/pkg/agent/application/managed_mode.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,14 @@ import (
"net/http"
"net/url"

"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/plugin/app"

"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/application/filters"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/application/info"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/errors"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/operation"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/storage"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/config"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/plugin/app/monitoring"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/monitoring"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/server"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/fleetapi"
reporting "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/reporter"
Expand Down Expand Up @@ -118,7 +117,7 @@ func newManaged(
}

managedApplication.bgContext, managedApplication.cancelCtxFn = context.WithCancel(ctx)
managedApplication.srv, err = server.NewFromConfig(log, rawConfig, &app.ApplicationStatusHandler{})
managedApplication.srv, err = server.NewFromConfig(log, rawConfig, &operation.ApplicationStatusHandler{})
if err != nil {
return nil, errors.New(err, "initialize GRPC listener", errors.TypeNetwork)
}
Expand Down Expand Up @@ -151,7 +150,7 @@ func newManaged(
router,
&configModifiers{
Decorators: []decoratorFunc{injectMonitoring},
Filters: []filterFunc{filters.ConstraintFilter},
Filters: []filterFunc{injectFleet(config), filters.ConstraintFilter},
},
monitor,
)
Expand Down
4 changes: 2 additions & 2 deletions x-pack/elastic-agent/pkg/agent/application/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@ import (
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/artifact/install"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/config"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/plugin/app/monitoring"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/plugin/state"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/monitoring"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/server"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/state"
)

type operatorStream struct {
Expand Down
8 changes: 5 additions & 3 deletions x-pack/elastic-agent/pkg/agent/configrequest/step.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

package configrequest

import "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/program"

const (
// StepRun is a name of Start program event
StepRun = "sc-run"
Expand All @@ -20,12 +22,12 @@ type Step struct {
ID string
// Version is a version of a program
Version string
// Process defines a process such as `filebeat`
Process string
// Spec for the program
ProgramSpec program.Spec
// Meta contains additional data such as version, configuration or tags.
Meta map[string]interface{}
}

func (s *Step) String() string {
return "[ID:" + s.ID + ", PROCESS: " + s.Process + " VERSION:" + s.Version + "]"
return "[ID:" + s.ID + ", PROCESS: " + s.ProgramSpec.Cmd + " VERSION:" + s.Version + "]"
}
18 changes: 11 additions & 7 deletions x-pack/elastic-agent/pkg/agent/operation/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,19 +12,20 @@ import (
"testing"
"time"

"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/server"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/program"

operatorCfg "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/operation/config"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/stateresolver"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/artifact"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/artifact/download"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/artifact/install"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/config"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/app"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/plugin/app"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/plugin/app/monitoring/noop"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/plugin/process"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/plugin/retry"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/monitoring/noop"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/process"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/retry"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/server"
)

var downloadPath = getAbsPath("tests/downloads")
Expand Down Expand Up @@ -60,7 +61,7 @@ func getTestOperator(t *testing.T, downloadPath string, installPath string, p *a
if err != nil {
t.Fatal(err)
}
srv, err := server.New(l, ":0", &app.ApplicationStatusHandler{})
srv, err := server.New(l, ":0", &ApplicationStatusHandler{})
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -104,7 +105,10 @@ func getProgram(binary, version string) *app.Descriptor {
OperatingSystem: "darwin",
Architecture: "32",
}
return app.NewDescriptor(binary, version, downloadCfg, nil)
return app.NewDescriptor(program.Spec{
Name: binary,
Cmd: binary,
}, version, downloadCfg, nil)
}

func getAbsPath(path string) string {
Expand Down
4 changes: 2 additions & 2 deletions x-pack/elastic-agent/pkg/agent/operation/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ package config
import (
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/artifact"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/plugin/process"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/plugin/retry"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/process"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/retry"
)

// Config is an operator configuration
Expand Down
27 changes: 17 additions & 10 deletions x-pack/elastic-agent/pkg/agent/operation/monitoring.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ import (

"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/configrequest"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/errors"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/plugin/app"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/program"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/app"
)

const (
Expand Down Expand Up @@ -37,7 +38,7 @@ func (o *Operator) handleStartSidecar(s configrequest.Step) (result error) {
if err != nil {
return errors.New(err,
errors.TypeApplication,
errors.M(errors.MetaKeyAppName, step.Process),
errors.M(errors.MetaKeyAppName, step.ProgramSpec.Cmd),
"operator.handleStartSidecar failed to create program")
}

Expand All @@ -46,13 +47,13 @@ func (o *Operator) handleStartSidecar(s configrequest.Step) (result error) {
if err := o.stop(p); err != nil {
result = multierror.Append(err, err)
} else {
o.markStopMonitoring(step.Process)
o.markStopMonitoring(step.ProgramSpec.Cmd)
}
} else {
if err := o.start(p, cfg); err != nil {
result = multierror.Append(err, err)
} else {
o.markStartMonitoring(step.Process)
o.markStartMonitoring(step.ProgramSpec.Cmd)
}
}
}
Expand All @@ -66,15 +67,15 @@ func (o *Operator) handleStopSidecar(s configrequest.Step) (result error) {
if err != nil {
return errors.New(err,
errors.TypeApplication,
errors.M(errors.MetaKeyAppName, step.Process),
errors.M(errors.MetaKeyAppName, step.ProgramSpec.Cmd),
"operator.handleStopSidecar failed to create program")
}

o.logger.Debugf("stopping program %v", p)
if err := o.stop(p); err != nil {
result = multierror.Append(err, err)
} else {
o.markStopMonitoring(step.Process)
o.markStopMonitoring(step.ProgramSpec.Cmd)
}
}

Expand All @@ -97,7 +98,7 @@ func (o *Operator) getMonitoringSteps(step configrequest.Step) []configrequest.S

outputIface, found := config[outputKey]
if !found {
o.logger.Errorf("operator.getMonitoringSteps: monitoring configuration not found for sidecar of type %s", step.Process)
o.logger.Errorf("operator.getMonitoringSteps: monitoring configuration not found for sidecar of type %s", step.ProgramSpec.Cmd)
return nil
}

Expand All @@ -109,7 +110,7 @@ func (o *Operator) getMonitoringSteps(step configrequest.Step) []configrequest.S

output, found := outputMap["elasticsearch"]
if !found {
o.logger.Error("operator.getMonitoringSteps: monitoring is missing an elasticsearch output configuration configuration for sidecar of type: %s", step.Process)
o.logger.Error("operator.getMonitoringSteps: monitoring is missing an elasticsearch output configuration configuration for sidecar of type: %s", step.ProgramSpec.Cmd)
return nil
}

Expand All @@ -131,7 +132,10 @@ func (o *Operator) generateMonitoringSteps(version string, output interface{}) [
filebeatStep := configrequest.Step{
ID: stepID,
Version: version,
Process: logsProcessName,
ProgramSpec: program.Spec{
Name: logsProcessName,
Cmd: logsProcessName,
},
Meta: map[string]interface{}{
configrequest.MetaConfigKey: fbConfig,
},
Expand All @@ -149,7 +153,10 @@ func (o *Operator) generateMonitoringSteps(version string, output interface{}) [
metricbeatStep := configrequest.Step{
ID: stepID,
Version: version,
Process: metricsProcessName,
ProgramSpec: program.Spec{
Name: metricsProcessName,
Cmd: metricsProcessName,
},
Meta: map[string]interface{}{
configrequest.MetaConfigKey: mbConfig,
},
Expand Down
23 changes: 13 additions & 10 deletions x-pack/elastic-agent/pkg/agent/operation/monitoring_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,19 +9,20 @@ import (
"testing"
"time"

"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/server"
"github.com/elastic/elastic-agent-client/v7/pkg/proto"

"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/configrequest"
operatorCfg "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/operation/config"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/stateresolver"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/artifact"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/config"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/plugin/app"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/plugin/app/monitoring"
monitoringConfig "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/plugin/app/monitoring/config"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/plugin/process"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/plugin/retry"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/plugin/state"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/app"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/monitoring"
monitoringConfig "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/monitoring/config"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/process"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/retry"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/server"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/state"
)

func TestGenerateSteps(t *testing.T) {
Expand Down Expand Up @@ -54,13 +55,13 @@ func TestGenerateSteps(t *testing.T) {
var fbFound, mbFound bool
for _, s := range steps {
// Filebeat step check
if s.Process == "filebeat" {
if s.ProgramSpec.Cmd == "filebeat" {
fbFound = true
checkStep(t, "filebeat", sampleOutput, s)
}

// Metricbeat step check
if s.Process == "metricbeat" {
if s.ProgramSpec.Cmd == "metricbeat" {
mbFound = true
checkStep(t, "metricbeat", sampleOutput, s)
}
Expand Down Expand Up @@ -125,7 +126,7 @@ func getMonitorableTestOperator(t *testing.T, installPath string, m monitoring.M
if err != nil {
t.Fatal(err)
}
srv, err := server.New(l, ":0", &app.ApplicationStatusHandler{})
srv, err := server.New(l, ":0", &ApplicationStatusHandler{})
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -156,6 +157,8 @@ func (*testMonitorableApp) Configure(_ context.Context, config map[string]interf
func (*testMonitorableApp) State() state.State { return state.State{} }
func (*testMonitorableApp) SetState(_ state.Status, _ string) {}
func (a *testMonitorableApp) Monitor() monitoring.Monitor { return a.monitor }
func (a *testMonitorableApp) OnStatusChange(_ *server.ApplicationState, _ proto.StateObserved_Status, _ string) {
}

type testMonitor struct {
monitorLogs bool
Expand Down
Loading

0 comments on commit d831809

Please sign in to comment.