Skip to content

Commit

Permalink
Fix Fleet Enrollment Handling for Containerized Agent (#6568)
Browse files Browse the repository at this point in the history
* feat: add k8s integration test to check fleet enrollment

* fix: container correct fleet enrollment when token changes or the agent is unenrolled

* fix: update TestDiagnosticLocalConfig to include enrollment_token_hash

* feat: add a simple retry logic while validate the stored agent api token

* feat: add unit-test for shouldFleetEnroll

* fix: improve unit-test explicitness and check for expected number of calls

* fix: kind in changelog fragment

* fix: split up ack-ing fleet in a separate function
  • Loading branch information
pkoutsovasilis authored Jan 28, 2025
1 parent 69e7612 commit 17814cc
Show file tree
Hide file tree
Showing 18 changed files with 1,376 additions and 49 deletions.
8 changes: 7 additions & 1 deletion .mockery.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,12 @@ packages:
github.com/elastic/elastic-agent/pkg/control/v2/client:
interfaces:
Client:
github.com/elastic/elastic-agent/internal/pkg/fleetapi/client:
interfaces:
Sender:
github.com/elastic/elastic-agent/internal/pkg/agent/storage:
interfaces:
Storage:
github.com/elastic/elastic-agent/internal/pkg/agent/application/actions/handlers:
interfaces:
Uploader:
Expand All @@ -22,4 +28,4 @@ packages:
Acker:
github.com/elastic/elastic-agent/internal/pkg/agent/application/info:
interfaces:
Agent:
Agent:
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# Kind can be one of:
# - breaking-change: a change to previously-documented behavior
# - deprecation: functionality that is being removed in a later release
# - bug-fix: fixes a problem in a previous version
# - enhancement: extends functionality but does not break or fix existing behavior
# - feature: new functionality
# - known-issue: problems that we are aware of in a given version
# - security: impacts on the security of a product or a user’s deployment.
# - upgrade: important information for someone upgrading from a prior version
# - other: does not fit into any of the other categories
kind: bug-fix

# Change summary; a 80ish characters long description of the change.
summary: Fix enrollment for containerised agent when enrollment token changes or the agent is unenrolled

# Long description; in case the summary is not enough to describe the change
# this field accommodate a description without length limits.
# NOTE: This field will be rendered only for breaking-change and known-issue kinds at the moment.
#description:

# Affected component; usually one of "elastic-agent", "fleet-server", "filebeat", "metricbeat", "auditbeat", "all", etc.
component: elastic-agent

# PR URL; optional; the PR number that added the changeset.
# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added.
# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number.
# Please provide it if you are adding a fragment for a different PR.
pr: https://github.com/elastic/elastic-agent/pull/6568

# Issue URL; optional; the GitHub issue related to this changeset (either closes or is part of).
# If not present is automatically filled by the tooling with the issue linked to the PR number.
issue: https://github.com/elastic/elastic-agent/issues/3586
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,9 @@ func TestDiagnosticLocalConfig(t *testing.T) {
// local-config hook correctly returns it.
cfg := &configuration.Configuration{
Fleet: &configuration.FleetAgentConfig{
Enabled: true,
AccessAPIKey: "test-key",
Enabled: true,
AccessAPIKey: "test-key",
EnrollmentTokenHash: "test-hash",
Client: remote.Config{
Protocol: "test-protocol",
},
Expand Down Expand Up @@ -119,6 +120,7 @@ agent:
fleet:
enabled: true
access_api_key: "test-key"
enrollment_token_hash: "test-hash"
agent:
protocol: "test-protocol"
`
Expand Down
188 changes: 171 additions & 17 deletions internal/pkg/agent/cmd/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ package cmd

import (
"bytes"
"context"
"encoding/base64"
"encoding/json"
"fmt"
"io"
Expand All @@ -15,12 +17,14 @@ import (
"os/exec"
"path/filepath"
"regexp"
"slices"
"strconv"
"strings"
"sync"
"syscall"
"time"

"github.com/cenkalti/backoff/v4"
"github.com/spf13/cobra"
"gopkg.in/yaml.v2"

Expand All @@ -31,8 +35,13 @@ import (
"github.com/elastic/elastic-agent/internal/pkg/agent/application/paths"
"github.com/elastic/elastic-agent/internal/pkg/agent/configuration"
"github.com/elastic/elastic-agent/internal/pkg/agent/errors"
"github.com/elastic/elastic-agent/internal/pkg/agent/storage"
"github.com/elastic/elastic-agent/internal/pkg/cli"
"github.com/elastic/elastic-agent/internal/pkg/config"
"github.com/elastic/elastic-agent/internal/pkg/crypto"
"github.com/elastic/elastic-agent/internal/pkg/fleetapi"
fleetclient "github.com/elastic/elastic-agent/internal/pkg/fleetapi/client"
"github.com/elastic/elastic-agent/internal/pkg/remote"
"github.com/elastic/elastic-agent/pkg/component"
"github.com/elastic/elastic-agent/pkg/core/logger"
"github.com/elastic/elastic-agent/pkg/core/process"
Expand Down Expand Up @@ -270,36 +279,27 @@ func containerCmd(streams *cli.IOStreams) error {

func runContainerCmd(streams *cli.IOStreams, cfg setupConfig) error {
var err error
var client *kibana.Client
executable, err := os.Executable()
if err != nil {
return err
}

initTimeout := envTimeout(fleetInitTimeoutName)

_, err = os.Stat(paths.AgentConfigFile())
if !os.IsNotExist(err) && !cfg.Fleet.Force {
// already enrolled, just run the standard run
return run(containerCfgOverrides, false, initTimeout, isContainer)
}

if cfg.FleetServer.Enable {
err = ensureServiceToken(streams, &cfg)
if err != nil {
return err
}
}

if cfg.Fleet.Enroll {
shouldEnroll, err := shouldFleetEnroll(cfg)
if err != nil {
return err
}
if shouldEnroll {
var policy *kibanaPolicy
token := cfg.Fleet.EnrollmentToken
if token == "" && !cfg.FleetServer.Enable {
if client == nil {
client, err = kibanaClient(cfg.Kibana, cfg.Kibana.Headers)
if err != nil {
return err
}
client, err := kibanaClient(cfg.Kibana, cfg.Kibana.Headers)
if err != nil {
return err
}
policy, err = kibanaFetchPolicy(cfg, client, streams)
if err != nil {
Expand All @@ -318,6 +318,11 @@ func runContainerCmd(streams *cli.IOStreams, cfg setupConfig) error {
logInfo(streams, "Policy selected for enrollment: ", policyID)
}

executable, err := os.Executable()
if err != nil {
return err
}

cmdArgs, err := buildEnrollArgs(cfg, token, policyID)
if err != nil {
return err
Expand Down Expand Up @@ -989,3 +994,152 @@ func isContainer(detail component.PlatformDetail) component.PlatformDetail {
detail.OS = component.Container
return detail
}

var (
newFleetClient = func(log *logger.Logger, apiKey string, cfg remote.Config) (fleetclient.Sender, error) {
return fleetclient.NewAuthWithConfig(log, apiKey, cfg)
}
newEncryptedDiskStore = storage.NewEncryptedDiskStore
statAgentConfigFile = os.Stat
)

// agentInfo implements the AgentInfo interface, and it used in shouldFleetEnroll.
type agentInfo struct {
id string
}

func (a *agentInfo) AgentID() string {
return a.id
}

// shouldFleetEnroll returns true if the elastic-agent should enroll to fleet.
func shouldFleetEnroll(setupCfg setupConfig) (bool, error) {
if !setupCfg.Fleet.Enroll {
// Enrollment is explicitly disabled in the setup configuration.
return false, nil
}

if setupCfg.Fleet.Force {
// Enrollment is explicitly enforced by the setup configuration.
return true, nil
}

agentCfgFilePath := paths.AgentConfigFile()
_, err := statAgentConfigFile(agentCfgFilePath)
if os.IsNotExist(err) {
// The agent configuration file does not exist, so enrollment is required.
return true, nil
}

ctx := context.Background()
store, err := newEncryptedDiskStore(ctx, agentCfgFilePath)
if err != nil {
return false, fmt.Errorf("failed to instantiate encrypted disk store: %w", err)
}

reader, err := store.Load()
if err != nil {
return false, fmt.Errorf("failed to load from disk store: %w", err)
}

cfg, err := config.NewConfigFrom(reader)
if err != nil {
return false, fmt.Errorf("failed to read from disk store: %w", err)
}

storedConfig, err := configuration.NewFromConfig(cfg)
if err != nil {
return false, fmt.Errorf("failed to read from disk store: %w", err)
}

storedFleetHosts := storedConfig.Fleet.Client.GetHosts()
if len(storedFleetHosts) == 0 || !slices.Contains(storedFleetHosts, setupCfg.Fleet.URL) {
// The Fleet URL in the setup does not exist in the stored configuration, so enrollment is required.
return true, nil
}

// Evaluate the stored enrollment token hash against the setup enrollment token if both are present.
// Note that when "upgrading" from an older agent version the enrollment token hash will not exist
// in the stored configuration.
if len(storedConfig.Fleet.EnrollmentTokenHash) > 0 && len(setupCfg.Fleet.EnrollmentToken) > 0 {
enrollmentHashBytes, err := base64.StdEncoding.DecodeString(storedConfig.Fleet.EnrollmentTokenHash)
if err != nil {
return false, fmt.Errorf("failed to decode hash: %w", err)
}

err = crypto.ComparePBKDF2HashAndPassword(enrollmentHashBytes, []byte(setupCfg.Fleet.EnrollmentToken))
switch {
case errors.Is(err, crypto.ErrMismatchedHashAndPassword):
// The stored enrollment token hash does not match the new token, so enrollment is required.
return true, nil
case err != nil:
return false, fmt.Errorf("failed to compare hash: %w", err)
}
}

// Validate the stored API token to check if the agent is still authorized with Fleet.
log, err := logger.New("fleet_client", false)
if err != nil {
return false, fmt.Errorf("failed to create logger: %w", err)
}
fc, err := newFleetClient(log, storedConfig.Fleet.AccessAPIKey, storedConfig.Fleet.Client)
if err != nil {
return false, fmt.Errorf("failed to create fleet client: %w", err)
}

// Perform an ACK request with **empty events** to verify the validity of the API token.
// If the agent has been manually un-enrolled through the Kibana UI, the ACK request will fail due to an invalid API token.
// In such cases, the agent should automatically re-enroll and "recover" their enrollment status without manual intervention,
// maintaining seamless operation.
err = ackFleet(ctx, fc, storedConfig.Fleet.Info.ID)
switch {
case errors.Is(err, fleetclient.ErrInvalidAPIKey):
// The API key is invalid, so enrollment is required.
return true, nil
case err != nil:
return false, fmt.Errorf("failed to validate api token: %w", err)
}

// Update the stored enrollment token hash if there is no previous enrollment token hash
// (can happen when "upgrading" from an older version of the agent) and setup enrollment token is present.
if len(storedConfig.Fleet.EnrollmentTokenHash) == 0 && len(setupCfg.Fleet.EnrollmentToken) > 0 {
enrollmentHashBytes, err := crypto.GeneratePBKDF2FromPassword([]byte(setupCfg.Fleet.EnrollmentToken))
if err != nil {
return false, errors.New("failed to generate enrollment hash")
}
enrollmentTokenHash := base64.StdEncoding.EncodeToString(enrollmentHashBytes)
storedConfig.Fleet.EnrollmentTokenHash = enrollmentTokenHash

data, err := yaml.Marshal(storedConfig)
if err != nil {
return false, errors.New("could not marshal config")
}

if err := safelyStoreAgentInfo(store, bytes.NewReader(data)); err != nil {
return false, fmt.Errorf("failed to store agent config: %w", err)
}
}

return false, nil
}

// ackFleet performs an ACK request to the fleet server with **empty events**.
func ackFleet(ctx context.Context, client fleetclient.Sender, agentID string) error {
const retryInterval = time.Second
const maxRetries = 3
ackRequest := &fleetapi.AckRequest{Events: nil}
ackCMD := fleetapi.NewAckCmd(&agentInfo{agentID}, client)
retries := 0
return backoff.Retry(func() error {
retries++
_, err := ackCMD.Execute(ctx, ackRequest)
switch {
case err == nil:
return nil
case errors.Is(err, fleetclient.ErrInvalidAPIKey) || retries == maxRetries:
return backoff.Permanent(err)
default:
return err
}
}, &backoff.ConstantBackOff{Interval: retryInterval})
}
Loading

0 comments on commit 17814cc

Please sign in to comment.