Skip to content

Commit

Permalink
send package.version to components (#4024)
Browse files Browse the repository at this point in the history
The agent now includes AgentInfo alongside the connection information during component startup. It also stores the BuildHash sent by the components as part of the ComponentVersionInfo during check-in.

To facilitate testing, an info.Agent interface has been created to abstract the implementation of info.AgentInfo. This abstraction allows for the creation of mocks, enabling testing scenarios where using `info.NewAgentInfo` is impractical. This is because `info.NewAgentInfo` relies on the agent vault, which, on Mac systems, is the system's keychain. Accessing the keychain requires root permissions, which are not available during testing.
  • Loading branch information
AndersonQ authored Feb 7, 2024
1 parent 8dcab21 commit 4fa6bb6
Show file tree
Hide file tree
Showing 29 changed files with 418 additions and 97 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ const (
// PolicyChangeHandler is a handler for POLICY_CHANGE action.
type PolicyChangeHandler struct {
log *logger.Logger
agentInfo *info.AgentInfo
agentInfo info.Agent
config *configuration.Configuration
store storage.Store
ch chan coordinator.ConfigChange
Expand All @@ -52,7 +52,7 @@ type PolicyChangeHandler struct {
// NewPolicyChangeHandler creates a new PolicyChange handler.
func NewPolicyChangeHandler(
log *logger.Logger,
agentInfo *info.AgentInfo,
agentInfo info.Agent,
config *configuration.Configuration,
store storage.Store,
ch chan coordinator.ConfigChange,
Expand Down Expand Up @@ -264,7 +264,7 @@ func clientEqual(k1 remote.Config, k2 remote.Config) bool {
return true
}

func fleetToReader(agentInfo *info.AgentInfo, cfg *configuration.Configuration) (io.Reader, error) {
func fleetToReader(agentInfo info.Agent, cfg *configuration.Configuration) (io.Reader, error) {
configToStore := map[string]interface{}{
"fleet": cfg.Fleet,
"agent": map[string]interface{}{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,7 @@ func TestPolicyChange(t *testing.T) {
log, _ := logger.New("", false)
ack := noopacker.New()

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

agentInfo, _ := info.NewAgentInfo(ctx, true)
agentInfo := &info.AgentInfo{}
nullStore := &storage.NullStore{}

t.Run("Receive a config change and successfully emits a raw configuration", func(t *testing.T) {
Expand All @@ -63,10 +60,8 @@ func TestPolicyChange(t *testing.T) {

func TestPolicyAcked(t *testing.T) {
log, _ := logger.New("", false)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

agentInfo, _ := info.NewAgentInfo(ctx, true)
agentInfo := &info.AgentInfo{}
nullStore := &storage.NullStore{}

t.Run("Config change should ACK", func(t *testing.T) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,14 @@ import (
// Settings handles settings change coming from fleet and updates log level.
type Settings struct {
log *logger.Logger
agentInfo *info.AgentInfo
agentInfo info.Agent
coord *coordinator.Coordinator
}

// NewSettings creates a new Settings handler.
func NewSettings(
log *logger.Logger,
agentInfo *info.AgentInfo,
agentInfo info.Agent,
coord *coordinator.Coordinator,
) *Settings {
return &Settings{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@ func TestUpgradeHandler(t *testing.T) {
defer cancel()

log, _ := logger.New("", false)
agentInfo, _ := info.NewAgentInfo(ctx, true)

agentInfo := &info.AgentInfo{}
msgChan := make(chan string)

// Create and start the coordinator
Expand Down Expand Up @@ -95,7 +96,8 @@ func TestUpgradeHandlerSameVersion(t *testing.T) {
defer cancel()

log, _ := logger.New("", false)
agentInfo, _ := info.NewAgentInfo(ctx, true)

agentInfo := &info.AgentInfo{}
msgChan := make(chan string)

// Create and start the Coordinator
Expand Down Expand Up @@ -129,7 +131,8 @@ func TestUpgradeHandlerNewVersion(t *testing.T) {
defer cancel()

log, _ := logger.New("", false)
agentInfo, _ := info.NewAgentInfo(ctx, true)

agentInfo := &info.AgentInfo{}
msgChan := make(chan string)

// Create and start the Coordinator
Expand Down
2 changes: 1 addition & 1 deletion internal/pkg/agent/application/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func New(
log *logger.Logger,
baseLogger *logger.Logger,
logLevel logp.Level,
agentInfo *info.AgentInfo,
agentInfo info.Agent,
reexec coordinator.ReExecManager,
tracer *apm.Tracer,
testingMode bool,
Expand Down
4 changes: 2 additions & 2 deletions internal/pkg/agent/application/coordinator/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ type configReloader interface {
// All configuration changes, update variables, and upgrade actions are managed and controlled by the coordinator.
type Coordinator struct {
logger *logger.Logger
agentInfo *info.AgentInfo
agentInfo info.Agent
isManaged bool

cfg *configuration.Configuration
Expand Down Expand Up @@ -322,7 +322,7 @@ type UpdateComponentChange struct {
}

// New creates a new coordinator.
func New(logger *logger.Logger, cfg *configuration.Configuration, logLevel logp.Level, agentInfo *info.AgentInfo, specs component.RuntimeSpecs, reexecMgr ReExecManager, upgradeMgr UpgradeManager, runtimeMgr RuntimeManager, configMgr ConfigManager, varsMgr VarsManager, caps capabilities.Capabilities, monitorMgr MonitorManager, isManaged bool, modifiers ...ComponentsModifier) *Coordinator {
func New(logger *logger.Logger, cfg *configuration.Configuration, logLevel logp.Level, agentInfo info.Agent, specs component.RuntimeSpecs, reexecMgr ReExecManager, upgradeMgr UpgradeManager, runtimeMgr RuntimeManager, configMgr ConfigManager, varsMgr VarsManager, caps capabilities.Capabilities, monitorMgr MonitorManager, isManaged bool, modifiers ...ComponentsModifier) *Coordinator {
var fleetState cproto.State
var fleetMessage string
if !isManaged {
Expand Down
13 changes: 8 additions & 5 deletions internal/pkg/agent/application/coordinator/diagnostics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -409,10 +409,9 @@ components:
assert.YAMLEq(t, expected, string(result), "components-actual diagnostic returned unexpected value")
}

// TestDiagnosticState creates a coordinator with a test state and verify that
// the state diagnostic reports it.
func TestDiagnosticState(t *testing.T) {
// Create a coordinator with a test state and verify that the state
// diagnostic reports it

now := time.Now().UTC()
state := State{
State: agentclient.Starting,
Expand All @@ -427,7 +426,8 @@ func TestDiagnosticState(t *testing.T) {
State: client.UnitStateDegraded,
Message: "degraded message",
VersionInfo: runtime.ComponentVersionInfo{
Name: "version name",
Name: "version name",
BuildHash: "a-build-hash",
},
},
},
Expand Down Expand Up @@ -461,6 +461,7 @@ components:
units: {}
version_info:
name: "version name"
build_hash: "a-build-hash"
upgrade_details:
target_version: 8.12.0
state: UPG_DOWNLOADING
Expand Down Expand Up @@ -503,7 +504,8 @@ func TestDiagnosticStateForAPM(t *testing.T) {
State: client.UnitStateDegraded,
Message: "degraded message",
VersionInfo: runtime.ComponentVersionInfo{
Name: "version name",
Name: "version name",
BuildHash: "a-build-hash",
},
Component: &proto.Component{
ApmConfig: &proto.APMConfig{
Expand Down Expand Up @@ -540,6 +542,7 @@ components:
units: {}
version_info:
name: "version name"
build_hash: "a-build-hash"
component:
apmconfig:
elastic:
Expand Down
2 changes: 1 addition & 1 deletion internal/pkg/agent/application/fleet_server_bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ func FleetServerComponentModifier(serverCfg *configuration.FleetServerConfig) co

// InjectFleetConfigComponentModifier The modifier that injects the fleet configuration for the components
// that need to be able to connect to fleet server.
func InjectFleetConfigComponentModifier(fleetCfg *configuration.FleetAgentConfig, agentInfo *info.AgentInfo) coordinator.ComponentsModifier {
func InjectFleetConfigComponentModifier(fleetCfg *configuration.FleetAgentConfig, agentInfo info.Agent) coordinator.ComponentsModifier {
return func(comps []component.Component, cfg map[string]interface{}) ([]component.Component, error) {
hostsStr := fleetCfg.Client.GetHosts()
fleetHosts := make([]interface{}, 0, len(hostsStr))
Expand Down
23 changes: 23 additions & 0 deletions internal/pkg/agent/application/info/agent_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,29 @@ import (
"github.com/elastic/elastic-agent/pkg/core/logger"
)

type Agent interface {
// AgentID returns an agent identifier.
AgentID() string

// Headers returns custom headers used to communicate with elasticsearch.
Headers() map[string]string

// LogLevel retrieves a log level.
LogLevel() string

// ReloadID reloads agent info ID from configuration file.
ReloadID(ctx context.Context) error

// SetLogLevel updates log level of agent.
SetLogLevel(ctx context.Context, level string) error

// Snapshot returns if this version is a snapshot.
Snapshot() bool

// Version returns the version for this Agent.
Version() string
}

// AgentInfo is a collection of information about agent.
type AgentInfo struct {
agentID string
Expand Down
4 changes: 2 additions & 2 deletions internal/pkg/agent/application/managed_mode.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ const dispatchFlushInterval = time.Minute * 5

type managedConfigManager struct {
log *logger.Logger
agentInfo *info.AgentInfo
agentInfo info.Agent
cfg *configuration.Configuration
client *remote.Client
store storage.Store
Expand All @@ -60,7 +60,7 @@ type managedConfigManager struct {
func newManagedConfigManager(
ctx context.Context,
log *logger.Logger,
agentInfo *info.AgentInfo,
agentInfo info.Agent,
cfg *configuration.Configuration,
storeSaver storage.Store,
runtime *runtime.Manager,
Expand Down
6 changes: 3 additions & 3 deletions internal/pkg/agent/application/monitoring/v1_monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,15 +69,15 @@ type BeatsMonitor struct {
enabled bool // feature flag disabling whole v1 monitoring story
config *monitoringConfig
operatingSystem string
agentInfo *info.AgentInfo
agentInfo info.Agent
}

type monitoringConfig struct {
C *monitoringCfg.MonitoringConfig `config:"agent.monitoring"`
}

// New creates a new BeatsMonitor instance.
func New(enabled bool, operatingSystem string, cfg *monitoringCfg.MonitoringConfig, agentInfo *info.AgentInfo) *BeatsMonitor {
func New(enabled bool, operatingSystem string, cfg *monitoringCfg.MonitoringConfig, agentInfo info.Agent) *BeatsMonitor {
return &BeatsMonitor{
enabled: enabled,
config: &monitoringConfig{
Expand Down Expand Up @@ -914,7 +914,7 @@ func (b *BeatsMonitor) injectMetricsInput(cfg map[string]interface{}, componentI
return nil
}

func createProcessorsForJSONInput(name string, compID, monitoringNamespace string, agentInfo *info.AgentInfo) []interface{} {
func createProcessorsForJSONInput(name string, compID, monitoringNamespace string, agentInfo info.Agent) []interface{} {
return []interface{}{
map[string]interface{}{
"add_fields": map[string]interface{}{
Expand Down
4 changes: 2 additions & 2 deletions internal/pkg/agent/application/upgrade/upgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ var ErrSameVersion = errors.New("upgrade did not occur because its the same vers
type Upgrader struct {
log *logger.Logger
settings *artifact.Config
agentInfo *info.AgentInfo
agentInfo info.Agent
upgradeable bool
fleetServerURI string
markerWatcher MarkerWatcher
Expand All @@ -67,7 +67,7 @@ func IsUpgradeable() bool {
}

// NewUpgrader creates an upgrader which is capable of performing upgrade operation
func NewUpgrader(log *logger.Logger, settings *artifact.Config, agentInfo *info.AgentInfo) (*Upgrader, error) {
func NewUpgrader(log *logger.Logger, settings *artifact.Config, agentInfo info.Agent) (*Upgrader, error) {
return &Upgrader{
log: log,
settings: settings,
Expand Down
2 changes: 1 addition & 1 deletion internal/pkg/agent/storage/encrypted_disk_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ func (d *EncryptedDiskStore) Load() (rc io.ReadCloser, err error) {
fd, err := os.OpenFile(d.target, os.O_RDONLY, perms)
if err != nil {
if errors.Is(err, os.ErrNotExist) {
// If file doesn't exists, return empty reader closer
// If file doesn't exist, return empty reader closer
return io.NopCloser(bytes.NewReader([]byte{})), nil
}
return nil, errors.New(err,
Expand Down
13 changes: 10 additions & 3 deletions internal/pkg/agent/vault/vault_notdarwin.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,18 +96,25 @@ func New(ctx context.Context, path string, opts ...OptionFunc) (v *Vault, err er
func (v *Vault) Set(ctx context.Context, key string, data []byte) (err error) {
enc, err := v.encrypt(data)
if err != nil {
return err
return fmt.Errorf("vault Set: could not encrypt key: %w", err)
}

err = v.tryLock(ctx)
if err != nil {
return err
return fmt.Errorf("vault Set: could acquire lock: %w", err)
}
defer func() {
err = v.unlockAndJoinErrors(err)
if err != nil {
err = fmt.Errorf("vault Set: unlockAndJoinErrors failed: %w", err)
}
}()

return writeFile(v.filepathFromKey(key), enc)
err = writeFile(v.filepathFromKey(key), enc)
if err != nil {
return fmt.Errorf("vaukt: could not write key to file: %w", err)
}
return nil
}

// Get retrieves the key from the vault store
Expand Down
2 changes: 1 addition & 1 deletion pkg/component/runtime/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -420,7 +420,7 @@ func (c *commandRuntime) stop(ctx context.Context) error {

func (c *commandRuntime) startWatcher(info *process.Info, comm Communicator) {
go func() {
err := comm.WriteConnInfo(info.Stdin)
err := comm.WriteStartUpInfo(info.Stdin)
if err != nil {
_, _ = c.logErr.Write([]byte(fmt.Sprintf("Failed: failed to provide connection information to spawned pid '%d': %s", info.PID, err)))
// kill instantly
Expand Down
2 changes: 1 addition & 1 deletion pkg/component/runtime/conn_info_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func newConnInfoServer(log *logger.Logger, comm Communicator, port int) (*connIn
break
}
log.Debugf("client connected, sending connection info")
err = comm.WriteConnInfo(conn)
err = comm.WriteStartUpInfo(conn)
if err != nil {
if !errors.Is(err, io.EOF) {
log.Errorf("failed write conn info: %v", err)
Expand Down
2 changes: 1 addition & 1 deletion pkg/component/runtime/conn_info_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func newMockCommunicator() *mockCommunicator {
}
}

func (c *mockCommunicator) WriteConnInfo(w io.Writer, services ...client.Service) error {
func (c *mockCommunicator) WriteStartUpInfo(w io.Writer, services ...client.Service) error {
infoBytes, err := protobuf.Marshal(c.startupInfo)
if err != nil {
return fmt.Errorf("failed to marshal connection information: %w", err)
Expand Down
8 changes: 6 additions & 2 deletions pkg/component/runtime/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ type Manager struct {
ca *authority.CertificateAuthority
listenAddr string
listenPort int
agentInfo *info.AgentInfo
agentInfo info.Agent
tracer *apm.Tracer
monitor MonitoringManager
grpcConfig *configuration.GRPCConfig
Expand Down Expand Up @@ -150,7 +150,7 @@ func NewManager(
logger,
baseLogger *logger.Logger,
listenAddr string,
agentInfo *info.AgentInfo,
agentInfo info.Agent,
tracer *apm.Tracer,
monitor MonitoringManager,
grpcConfig *configuration.GRPCConfig,
Expand All @@ -159,6 +159,10 @@ func NewManager(
if err != nil {
return nil, err
}

if agentInfo == nil {
return nil, errors.New("agentInfo cannot be nil")
}
m := &Manager{
logger: logger,
baseLogger: baseLogger,
Expand Down
Loading

0 comments on commit 4fa6bb6

Please sign in to comment.