Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert removal of metrics-monitoring change, fix tests to account for fleet naming changes #4462

Merged
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# Kind can be one of:
# - breaking-change: a change to previously-documented behavior
# - deprecation: functionality that is being removed in a later release
# - bug-fix: fixes a problem in a previous version
# - enhancement: extends functionality but does not break or fix existing behavior
# - feature: new functionality
# - known-issue: problems that we are aware of in a given version
# - security: impacts on the security of a product or a user’s deployment.
# - upgrade: important information for someone upgrading from a prior version
# - other: does not fit into any of the other categories
kind: enhancement

# Change summary; a 80ish characters long description of the change.
summary: add monitoring beats to usage metrics reporting

# Long description; in case the summary is not enough to describe the change
# this field accommodate a description without length limits.
# NOTE: This field will be rendered only for breaking-change and known-issue kinds at the moment.
#description:

# Affected component; usually one of "elastic-agent", "fleet-server", "filebeat", "metricbeat", "auditbeat", "all", etc.
component: monitoring

# PR URL; optional; the PR number that added the changeset.
# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added.
# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number.
# Please provide it if you are adding a fragment for a different PR.
pr: https://github.com/elastic/elastic-agent/pull/4326

# Issue URL; optional; the GitHub issue related to this changeset (either closes or is part of).
# If not present is automatically filled by the tooling with the issue linked to the PR number.
issue: https://github.com/elastic/elastic-agent/issues/4082
49 changes: 31 additions & 18 deletions internal/pkg/agent/application/monitoring/v1_monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@ const (
defaultMonitoringNamespace = "default"
agentName = "elastic-agent"

monitoringMetricsUnitID = "metrics-monitoring"
monitoringFilesUnitsID = "filestream-monitoring"

windowsOS = "windows"

// metricset execution period used for the monitoring metrics inputs
Expand Down Expand Up @@ -301,7 +304,7 @@ func (b *BeatsMonitor) injectLogsInput(cfg map[string]interface{}, components []

streams := []interface{}{
map[string]interface{}{
idKey: "filestream-monitoring-agent",
idKey: fmt.Sprintf("%s-agent", monitoringFilesUnitsID),
"type": "filestream",
"paths": []interface{}{
filepath.Join(logsDrop, agentName+"-*.ndjson"),
Expand Down Expand Up @@ -439,7 +442,7 @@ func (b *BeatsMonitor) injectLogsInput(cfg map[string]interface{}, components []
fixedBinaryName := strings.ReplaceAll(strings.ReplaceAll(comp.InputSpec.BinaryName, "-", "_"), "/", "_") // conform with index naming policy
dataset := fmt.Sprintf("elastic_agent.%s", fixedBinaryName)
streams = append(streams, map[string]interface{}{
idKey: fmt.Sprintf("filestream-monitoring-%s", comp.ID),
idKey: fmt.Sprintf("%s-%s", monitoringFilesUnitsID, comp.ID),
"type": "filestream",
"paths": []interface{}{
comp.InputSpec.Spec.Service.Log.Path,
Expand Down Expand Up @@ -492,8 +495,8 @@ func (b *BeatsMonitor) injectLogsInput(cfg map[string]interface{}, components []

inputs := []interface{}{
map[string]interface{}{
idKey: "filestream-monitoring-agent",
"name": "filestream-monitoring-agent",
idKey: fmt.Sprintf("%s-agent", monitoringFilesUnitsID),
"name": fmt.Sprintf("%s-agent", monitoringFilesUnitsID),
"type": "filestream",
useOutputKey: monitoringOutput,
"streams": streams,
Expand Down Expand Up @@ -522,14 +525,13 @@ func (b *BeatsMonitor) monitoringNamespace() string {
}

func (b *BeatsMonitor) injectMetricsInput(cfg map[string]interface{}, componentIDToBinary map[string]string, monitoringOutputName string, componentList []component.Component) error {

metricsCollectionIntervalString := metricsCollectionInterval.String()
monitoringNamespace := b.monitoringNamespace()
fixedAgentName := strings.ReplaceAll(agentName, "-", "_")
beatsStreams := make([]interface{}, 0, len(componentIDToBinary))
streams := []interface{}{
map[string]interface{}{
idKey: "metrics-monitoring-agent",
idKey: fmt.Sprintf("%s-agent", monitoringMetricsUnitID),
"data_stream": map[string]interface{}{
"type": "metrics",
"dataset": fmt.Sprintf("elastic_agent.%s", fixedAgentName),
Expand Down Expand Up @@ -606,7 +608,18 @@ func (b *BeatsMonitor) injectMetricsInput(cfg map[string]interface{}, componentI
},
},
}
for unit, binaryName := range componentIDToBinary {

//create a new map with the monitoring beats included
componentListWithMonitoring := map[string]string{
fmt.Sprintf("beat/%s", monitoringMetricsUnitID): "metricbeat",
fmt.Sprintf("http/%s", monitoringMetricsUnitID): "metricbeat",
monitoringFilesUnitsID: "filebeat",
}
for k, v := range componentIDToBinary {
componentListWithMonitoring[k] = v
}

for unit, binaryName := range componentListWithMonitoring {
if !isSupportedMetricsBinary(binaryName) {
continue
}
Expand All @@ -616,7 +629,7 @@ func (b *BeatsMonitor) injectMetricsInput(cfg map[string]interface{}, componentI

if isSupportedBeatsBinary(binaryName) {
beatsStreams = append(beatsStreams, map[string]interface{}{
idKey: "metrics-monitoring-" + name,
idKey: fmt.Sprintf("%s-", monitoringMetricsUnitID) + name,
"data_stream": map[string]interface{}{
"type": "metrics",
"dataset": fmt.Sprintf("elastic_agent.%s", name),
Expand Down Expand Up @@ -678,7 +691,7 @@ func (b *BeatsMonitor) injectMetricsInput(cfg map[string]interface{}, componentI
}

streams = append(streams, map[string]interface{}{
idKey: "metrics-monitoring-" + name + "-1",
idKey: fmt.Sprintf("%s-%s-1", monitoringMetricsUnitID, name),
"data_stream": map[string]interface{}{
"type": "metrics",
"dataset": fmt.Sprintf("elastic_agent.%s", fixedAgentName),
Expand Down Expand Up @@ -748,7 +761,7 @@ func (b *BeatsMonitor) injectMetricsInput(cfg map[string]interface{}, componentI
if strings.EqualFold(name, "filebeat") {
fbDataStreamName := "filebeat_input"
streams = append(streams, map[string]interface{}{
idKey: "metrics-monitoring-" + name + "-1",
idKey: fmt.Sprintf("%s-%s-1", monitoringMetricsUnitID, name),
"data_stream": map[string]interface{}{
"type": "metrics",
"dataset": fmt.Sprintf("elastic_agent.%s", fbDataStreamName),
Expand Down Expand Up @@ -832,7 +845,7 @@ func (b *BeatsMonitor) injectMetricsInput(cfg map[string]interface{}, componentI
// note: this doesn't fetch anything from the /state endpoint, as it doesn't report much beyond name/version,
// the equivalent of the beat /state metrics end up in /shipper
shipperHTTPStreams = append(shipperHTTPStreams, map[string]interface{}{
idKey: "metrics-monitoring-shipper",
idKey: fmt.Sprintf("%s-shipper", monitoringMetricsUnitID),
"data_stream": map[string]interface{}{
"type": "metrics",
"dataset": fmt.Sprintf("elastic_agent.%s", name),
Expand All @@ -846,7 +859,7 @@ func (b *BeatsMonitor) injectMetricsInput(cfg map[string]interface{}, componentI
"processors": createProcessorsForJSONInput(name, comp.ID, monitoringNamespace, b.agentInfo),
},
map[string]interface{}{
idKey: "metrics-monitoring-shipper-stats",
idKey: fmt.Sprintf("%s-shipper-stats", monitoringMetricsUnitID),
"data_stream": map[string]interface{}{
"type": "metrics",
"dataset": fmt.Sprintf("elastic_agent.%s", name),
Expand All @@ -864,8 +877,8 @@ func (b *BeatsMonitor) injectMetricsInput(cfg map[string]interface{}, componentI

inputs := []interface{}{
map[string]interface{}{
idKey: "metrics-monitoring-beats",
"name": "metrics-monitoring-beats",
idKey: fmt.Sprintf("%s-beats", monitoringMetricsUnitID),
"name": fmt.Sprintf("%s-beats", monitoringMetricsUnitID),
"type": "beat/metrics",
useOutputKey: monitoringOutput,
"data_stream": map[string]interface{}{
Expand All @@ -874,8 +887,8 @@ func (b *BeatsMonitor) injectMetricsInput(cfg map[string]interface{}, componentI
"streams": beatsStreams,
},
map[string]interface{}{
idKey: "metrics-monitoring-agent",
"name": "metrics-monitoring-agent",
idKey: fmt.Sprintf("%s-agent", monitoringMetricsUnitID),
"name": fmt.Sprintf("%s-agent", monitoringMetricsUnitID),
"type": "http/metrics",
useOutputKey: monitoringOutput,
"data_stream": map[string]interface{}{
Expand All @@ -888,8 +901,8 @@ func (b *BeatsMonitor) injectMetricsInput(cfg map[string]interface{}, componentI
// if we have shipper data, inject the extra inputs
if len(shipperHTTPStreams) > 0 {
inputs = append(inputs, map[string]interface{}{
idKey: "metrics-monitoring-shipper",
"name": "metrics-monitoring-shipper",
idKey: fmt.Sprintf("%s-shipper", monitoringMetricsUnitID),
"name": fmt.Sprintf("%s-shipper", monitoringMetricsUnitID),
"type": "http/metrics",
useOutputKey: monitoringOutput,
"data_stream": map[string]interface{}{
Expand Down
29 changes: 19 additions & 10 deletions internal/pkg/agent/application/monitoring/v1_monitor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,16 +165,16 @@ func TestMonitoringConfigComponentFields(t *testing.T) {
if _, exists := processor["add_fields"]; !exists {
continue
}
p := Processor{}
if err := json.Unmarshal([]byte(mapstr.M(processor).String()), &p); err != nil {
streamProc := Processor{}
if err := json.Unmarshal([]byte(mapstr.M(processor).String()), &streamProc); err != nil {
t.Errorf("could not decode processor config: %q, err: %s", "foo", err)
}
if p.AddFields.Target != "component" {
if streamProc.AddFields.Target != "component" {
continue
}

binary := p.AddFields.Fields.Binary
componentID := p.AddFields.Fields.ID
binary := streamProc.AddFields.Fields.Binary
componentID := streamProc.AddFields.Fields.ID

// The elastic-Agent is a special case, handle it first
if strings.Contains(streamID, "monitoring-agent") {
Expand All @@ -186,11 +186,20 @@ func TestMonitoringConfigComponentFields(t *testing.T) {
}
continue
}
if binary != "filebeat" {
t.Errorf("expecting fields['binary'] = 'filebeat', got %q", binary)
}
if componentID != "filestream-default" {
t.Errorf("expecting fields['id'] = 'filestream-default', got %q", componentID)
if !strings.Contains(componentID, "monitoring") {
if binary != "filebeat" {
t.Errorf("expecting fields['binary'] = 'filebeat', got %q", binary)
}
if componentID != "filestream-default" {
t.Errorf("expecting fields['id'] = 'filestream-default', got %q", componentID)
}
} else {
if binary != "filebeat" && binary != "metricbeat" {
t.Errorf("expected monitoring compoent to be metricbeat or filebeat, got %s", binary)
}
if componentID != monitoringFilesUnitsID && componentID != "beat/metrics-monitoring" && componentID != "http/metrics-monitoring" {
t.Errorf("got unxpected monitoring component ID: %s", componentID)
}
}

}
Expand Down
16 changes: 9 additions & 7 deletions pkg/testing/tools/estools/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ func GetLatestDocumentMatchingQuery(ctx context.Context, client elastictransport
return Documents{}, fmt.Errorf("error creating ES query: %w", err)
}

return performQueryForRawQuery(ctx, queryRaw, indexPattern, client)
return PerformQueryForRawQuery(ctx, queryRaw, indexPattern, client)
}

// GetIndexTemplatesForPattern lists all index templates on the system
Expand Down Expand Up @@ -362,7 +362,7 @@ func FindMatchingLogLinesWithContext(ctx context.Context, client elastictranspor
return Documents{}, fmt.Errorf("error creating ES query: %w", err)
}

return performQueryForRawQuery(ctx, queryRaw, "logs-elastic_agent*", client)
return PerformQueryForRawQuery(ctx, queryRaw, "logs-elastic_agent*", client)

}

Expand Down Expand Up @@ -434,7 +434,7 @@ func CheckForErrorsInLogsWithContext(ctx context.Context, client elastictranspor
return Documents{}, fmt.Errorf("error creating ES query: %w", err)
}

return performQueryForRawQuery(ctx, queryRaw, "logs-elastic_agent*", client)
return PerformQueryForRawQuery(ctx, queryRaw, "logs-elastic_agent*", client)
}

// GetLogsForDataset returns any logs associated with the datastream
Expand Down Expand Up @@ -525,7 +525,7 @@ func GetLogsForDatasetWithContext(ctx context.Context, client elastictransport.I
},
}

return performQueryForRawQuery(ctx, indexQuery, "logs-elastic_agent*", client)
return PerformQueryForRawQuery(ctx, indexQuery, "logs-elastic_agent*", client)
}

// GetLogsForIndexWithContext returns any logs that match the given condition
Expand All @@ -536,7 +536,7 @@ func GetLogsForIndexWithContext(ctx context.Context, client elastictransport.Int
},
}

return performQueryForRawQuery(ctx, indexQuery, index, client)
return PerformQueryForRawQuery(ctx, indexQuery, index, client)
}

// GetPing performs a basic ping and returns ES config info
Expand All @@ -561,7 +561,8 @@ func GetPing(ctx context.Context, client elastictransport.Interface) (Ping, erro

}

func performQueryForRawQuery(ctx context.Context, queryRaw map[string]interface{}, index string, client elastictransport.Interface) (Documents, error) {
// PerformQueryForRawQuery executes the ES query specified by queryRaw
func PerformQueryForRawQuery(ctx context.Context, queryRaw map[string]interface{}, index string, client elastictransport.Interface) (Documents, error) {
var buf bytes.Buffer
err := json.NewEncoder(&buf).Encode(queryRaw)
if err != nil {
Expand All @@ -576,6 +577,7 @@ func performQueryForRawQuery(ctx context.Context, queryRaw map[string]interface{
es.Search.WithTrackTotalHits(true),
es.Search.WithPretty(),
es.Search.WithContext(ctx),
es.Search.WithSize(300),
)
if err != nil {
return Documents{}, fmt.Errorf("error performing ES search: %w", err)
Expand Down Expand Up @@ -613,7 +615,7 @@ func FindMatchingLogLinesForAgentWithContext(ctx context.Context, client elastic
return Documents{}, fmt.Errorf("error creating ES query: %w", err)
}

return performQueryForRawQuery(ctx, queryRaw, "logs-elastic_agent*", client)
return PerformQueryForRawQuery(ctx, queryRaw, "logs-elastic_agent*", client)
}

// GetLogsForDatastream returns any logs associated with the datastream
Expand Down
32 changes: 32 additions & 0 deletions pkg/testing/tools/kibana.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"fmt"
"net/http"
"net/url"
"os"
"time"

"github.com/elastic/elastic-agent-libs/kibana"
Expand Down Expand Up @@ -89,3 +90,34 @@ func GetDashboards(ctx context.Context, client *kibana.Client) ([]Dashboard, err

return dashboards, nil
}

// InstallPackageFromDefaultFile allows for a test ideom where a JSON policy file can be loaded, and then updated with variables that are specific to a given test.
// This can allow a single JSON policy file to be reused across multiple tests.
// existingPolicyID should be the ID of an agent policy that was already created with InstallAgentWithPolicy()
func InstallPackageFromDefaultFile(ctx context.Context, client *kibana.Client, packagePolicyName string, packageVersion string, policyJsonPath string, policyUUID string, existingPolicyID string) (kibana.PackagePolicyResponse, error) {
installPackage := kibana.PackagePolicyRequest{}

jsonRaw, err := os.ReadFile(policyJsonPath)
if err != nil {
return kibana.PackagePolicyResponse{}, fmt.Errorf("error reading JSON policy file: %w", err)
}

err = json.Unmarshal(jsonRaw, &installPackage)
if err != nil {
return kibana.PackagePolicyResponse{}, fmt.Errorf("error unmarshaling json: %w", err)
}

installPackage.Package.Version = packageVersion
installPackage.ID = policyUUID
installPackage.PolicyID = existingPolicyID
installPackage.Namespace = "default"
installPackage.Name = fmt.Sprintf("%s-test-%s", packagePolicyName, policyUUID)
installPackage.Vars = map[string]interface{}{}

resp, err := client.InstallFleetPackage(ctx, installPackage)
if err != nil {
return kibana.PackagePolicyResponse{}, fmt.Errorf("error installing fleet package: %w", err)
}

return resp, nil
}
26 changes: 2 additions & 24 deletions testing/integration/agent_long_running_leak_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ package integration
import (
"context"
"encoding/json"
"fmt"
"io"
"net"
"net/http"
Expand Down Expand Up @@ -122,33 +121,12 @@ func (runner *ExtendedRunner) SetupSuite() {
policyResp, err := tools.InstallAgentWithPolicy(ctx, runner.T(), installOpts, runner.agentFixture, runner.info.KibanaClient, basePolicy)
require.NoError(runner.T(), err)

// install system package
runner.InstallPackage(ctx, "system", "1.53.1", "agent_long_test_base_system_integ.json", uuid.New().String(), policyResp.ID)

// install cef
runner.InstallPackage(ctx, "apache", "1.17.0", "agent_long_test_apache.json", uuid.New().String(), policyResp.ID)

}

func (runner *ExtendedRunner) InstallPackage(ctx context.Context, name string, version string, cfgFile string, policyUUID string, policyID string) {
installPackage := kibana.PackagePolicyRequest{}

jsonRaw, err := os.ReadFile(cfgFile)
_, err = tools.InstallPackageFromDefaultFile(ctx, runner.info.KibanaClient, "system", "1.53.1", "agent_long_test_base_system_integ.json", uuid.New().String(), policyResp.ID)
require.NoError(runner.T(), err)

err = json.Unmarshal(jsonRaw, &installPackage)
_, err = tools.InstallPackageFromDefaultFile(ctx, runner.info.KibanaClient, "apache", "1.17.0", "agent_long_test_apache.json", uuid.New().String(), policyResp.ID)
require.NoError(runner.T(), err)

installPackage.Package.Version = version
installPackage.ID = policyUUID
installPackage.PolicyID = policyID
installPackage.Namespace = "default"
installPackage.Name = fmt.Sprintf("%s-long-test-%s", name, policyUUID)
installPackage.Vars = map[string]interface{}{}

runner.T().Logf("Installing %s package....", name)
_, err = runner.info.KibanaClient.InstallFleetPackage(ctx, installPackage)
require.NoError(runner.T(), err, "error creating fleet package")
}

func (runner *ExtendedRunner) TestHandleLeak() {
Expand Down
Loading
Loading