Skip to content

Commit

Permalink
Add metrics-monitoring beats to resource monitoring (elastic#4326)
Browse files Browse the repository at this point in the history
* add monitoring beats to resources

* finish tests, use constant for names in monitoring

* fix headers, add changelog

* fix tests

* refactor tests

* oops

* refine es query

* use const for tests

* formatting

* adjust document check
  • Loading branch information
fearful-symmetry authored Mar 20, 2024
1 parent 0159e54 commit 4594088
Show file tree
Hide file tree
Showing 8 changed files with 1,059 additions and 59 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# Kind can be one of:
# - breaking-change: a change to previously-documented behavior
# - deprecation: functionality that is being removed in a later release
# - bug-fix: fixes a problem in a previous version
# - enhancement: extends functionality but does not break or fix existing behavior
# - feature: new functionality
# - known-issue: problems that we are aware of in a given version
# - security: impacts on the security of a product or a user’s deployment.
# - upgrade: important information for someone upgrading from a prior version
# - other: does not fit into any of the other categories
kind: enhancement

# Change summary; a 80ish characters long description of the change.
summary: add monitoring beats to usage metrics reporting

# Long description; in case the summary is not enough to describe the change
# this field accommodate a description without length limits.
# NOTE: This field will be rendered only for breaking-change and known-issue kinds at the moment.
#description:

# Affected component; usually one of "elastic-agent", "fleet-server", "filebeat", "metricbeat", "auditbeat", "all", etc.
component: monitoring

# PR URL; optional; the PR number that added the changeset.
# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added.
# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number.
# Please provide it if you are adding a fragment for a different PR.
pr: https://github.com/elastic/elastic-agent/pull/4326

# Issue URL; optional; the GitHub issue related to this changeset (either closes or is part of).
# If not present is automatically filled by the tooling with the issue linked to the PR number.
issue: https://github.com/elastic/elastic-agent/issues/4082
49 changes: 31 additions & 18 deletions internal/pkg/agent/application/monitoring/v1_monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@ const (
defaultMonitoringNamespace = "default"
agentName = "elastic-agent"

monitoringMetricsUnitID = "metrics-monitoring"
monitoringFilesUnitsID = "filestream-monitoring"

windowsOS = "windows"

// metricset execution period used for the monitoring metrics inputs
Expand Down Expand Up @@ -301,7 +304,7 @@ func (b *BeatsMonitor) injectLogsInput(cfg map[string]interface{}, components []

streams := []interface{}{
map[string]interface{}{
idKey: "filestream-monitoring-agent",
idKey: fmt.Sprintf("%s-agent", monitoringFilesUnitsID),
"type": "filestream",
"paths": []interface{}{
filepath.Join(logsDrop, agentName+"-*.ndjson"),
Expand Down Expand Up @@ -439,7 +442,7 @@ func (b *BeatsMonitor) injectLogsInput(cfg map[string]interface{}, components []
fixedBinaryName := strings.ReplaceAll(strings.ReplaceAll(comp.InputSpec.BinaryName, "-", "_"), "/", "_") // conform with index naming policy
dataset := fmt.Sprintf("elastic_agent.%s", fixedBinaryName)
streams = append(streams, map[string]interface{}{
idKey: fmt.Sprintf("filestream-monitoring-%s", comp.ID),
idKey: fmt.Sprintf("%s-%s", monitoringFilesUnitsID, comp.ID),
"type": "filestream",
"paths": []interface{}{
comp.InputSpec.Spec.Service.Log.Path,
Expand Down Expand Up @@ -492,8 +495,8 @@ func (b *BeatsMonitor) injectLogsInput(cfg map[string]interface{}, components []

inputs := []interface{}{
map[string]interface{}{
idKey: "filestream-monitoring-agent",
"name": "filestream-monitoring-agent",
idKey: fmt.Sprintf("%s-agent", monitoringFilesUnitsID),
"name": fmt.Sprintf("%s-agent", monitoringFilesUnitsID),
"type": "filestream",
useOutputKey: monitoringOutput,
"streams": streams,
Expand Down Expand Up @@ -522,14 +525,13 @@ func (b *BeatsMonitor) monitoringNamespace() string {
}

func (b *BeatsMonitor) injectMetricsInput(cfg map[string]interface{}, componentIDToBinary map[string]string, monitoringOutputName string, componentList []component.Component) error {

metricsCollectionIntervalString := metricsCollectionInterval.String()
monitoringNamespace := b.monitoringNamespace()
fixedAgentName := strings.ReplaceAll(agentName, "-", "_")
beatsStreams := make([]interface{}, 0, len(componentIDToBinary))
streams := []interface{}{
map[string]interface{}{
idKey: "metrics-monitoring-agent",
idKey: fmt.Sprintf("%s-agent", monitoringMetricsUnitID),
"data_stream": map[string]interface{}{
"type": "metrics",
"dataset": fmt.Sprintf("elastic_agent.%s", fixedAgentName),
Expand Down Expand Up @@ -606,7 +608,18 @@ func (b *BeatsMonitor) injectMetricsInput(cfg map[string]interface{}, componentI
},
},
}
for unit, binaryName := range componentIDToBinary {

//create a new map with the monitoring beats included
componentListWithMonitoring := map[string]string{
fmt.Sprintf("beat/%s", monitoringMetricsUnitID): "metricbeat",
fmt.Sprintf("http/%s", monitoringMetricsUnitID): "metricbeat",
monitoringFilesUnitsID: "filebeat",
}
for k, v := range componentIDToBinary {
componentListWithMonitoring[k] = v
}

for unit, binaryName := range componentListWithMonitoring {
if !isSupportedMetricsBinary(binaryName) {
continue
}
Expand All @@ -616,7 +629,7 @@ func (b *BeatsMonitor) injectMetricsInput(cfg map[string]interface{}, componentI

if isSupportedBeatsBinary(binaryName) {
beatsStreams = append(beatsStreams, map[string]interface{}{
idKey: "metrics-monitoring-" + name,
idKey: fmt.Sprintf("%s-", monitoringMetricsUnitID) + name,
"data_stream": map[string]interface{}{
"type": "metrics",
"dataset": fmt.Sprintf("elastic_agent.%s", name),
Expand Down Expand Up @@ -678,7 +691,7 @@ func (b *BeatsMonitor) injectMetricsInput(cfg map[string]interface{}, componentI
}

streams = append(streams, map[string]interface{}{
idKey: "metrics-monitoring-" + name + "-1",
idKey: fmt.Sprintf("%s-%s-1", monitoringMetricsUnitID, name),
"data_stream": map[string]interface{}{
"type": "metrics",
"dataset": fmt.Sprintf("elastic_agent.%s", fixedAgentName),
Expand Down Expand Up @@ -748,7 +761,7 @@ func (b *BeatsMonitor) injectMetricsInput(cfg map[string]interface{}, componentI
if strings.EqualFold(name, "filebeat") {
fbDataStreamName := "filebeat_input"
streams = append(streams, map[string]interface{}{
idKey: "metrics-monitoring-" + name + "-1",
idKey: fmt.Sprintf("%s-%s-1", monitoringMetricsUnitID, name),
"data_stream": map[string]interface{}{
"type": "metrics",
"dataset": fmt.Sprintf("elastic_agent.%s", fbDataStreamName),
Expand Down Expand Up @@ -832,7 +845,7 @@ func (b *BeatsMonitor) injectMetricsInput(cfg map[string]interface{}, componentI
// note: this doesn't fetch anything from the /state endpoint, as it doesn't report much beyond name/version,
// the equivalent of the beat /state metrics end up in /shipper
shipperHTTPStreams = append(shipperHTTPStreams, map[string]interface{}{
idKey: "metrics-monitoring-shipper",
idKey: fmt.Sprintf("%s-shipper", monitoringMetricsUnitID),
"data_stream": map[string]interface{}{
"type": "metrics",
"dataset": fmt.Sprintf("elastic_agent.%s", name),
Expand All @@ -846,7 +859,7 @@ func (b *BeatsMonitor) injectMetricsInput(cfg map[string]interface{}, componentI
"processors": createProcessorsForJSONInput(name, comp.ID, monitoringNamespace, b.agentInfo),
},
map[string]interface{}{
idKey: "metrics-monitoring-shipper-stats",
idKey: fmt.Sprintf("%s-shipper-stats", monitoringMetricsUnitID),
"data_stream": map[string]interface{}{
"type": "metrics",
"dataset": fmt.Sprintf("elastic_agent.%s", name),
Expand All @@ -864,8 +877,8 @@ func (b *BeatsMonitor) injectMetricsInput(cfg map[string]interface{}, componentI

inputs := []interface{}{
map[string]interface{}{
idKey: "metrics-monitoring-beats",
"name": "metrics-monitoring-beats",
idKey: fmt.Sprintf("%s-beats", monitoringMetricsUnitID),
"name": fmt.Sprintf("%s-beats", monitoringMetricsUnitID),
"type": "beat/metrics",
useOutputKey: monitoringOutput,
"data_stream": map[string]interface{}{
Expand All @@ -874,8 +887,8 @@ func (b *BeatsMonitor) injectMetricsInput(cfg map[string]interface{}, componentI
"streams": beatsStreams,
},
map[string]interface{}{
idKey: "metrics-monitoring-agent",
"name": "metrics-monitoring-agent",
idKey: fmt.Sprintf("%s-agent", monitoringMetricsUnitID),
"name": fmt.Sprintf("%s-agent", monitoringMetricsUnitID),
"type": "http/metrics",
useOutputKey: monitoringOutput,
"data_stream": map[string]interface{}{
Expand All @@ -888,8 +901,8 @@ func (b *BeatsMonitor) injectMetricsInput(cfg map[string]interface{}, componentI
// if we have shipper data, inject the extra inputs
if len(shipperHTTPStreams) > 0 {
inputs = append(inputs, map[string]interface{}{
idKey: "metrics-monitoring-shipper",
"name": "metrics-monitoring-shipper",
idKey: fmt.Sprintf("%s-shipper", monitoringMetricsUnitID),
"name": fmt.Sprintf("%s-shipper", monitoringMetricsUnitID),
"type": "http/metrics",
useOutputKey: monitoringOutput,
"data_stream": map[string]interface{}{
Expand Down
29 changes: 19 additions & 10 deletions internal/pkg/agent/application/monitoring/v1_monitor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,16 +165,16 @@ func TestMonitoringConfigComponentFields(t *testing.T) {
if _, exists := processor["add_fields"]; !exists {
continue
}
p := Processor{}
if err := json.Unmarshal([]byte(mapstr.M(processor).String()), &p); err != nil {
streamProc := Processor{}
if err := json.Unmarshal([]byte(mapstr.M(processor).String()), &streamProc); err != nil {
t.Errorf("could not decode processor config: %q, err: %s", "foo", err)
}
if p.AddFields.Target != "component" {
if streamProc.AddFields.Target != "component" {
continue
}

binary := p.AddFields.Fields.Binary
componentID := p.AddFields.Fields.ID
binary := streamProc.AddFields.Fields.Binary
componentID := streamProc.AddFields.Fields.ID

// The elastic-Agent is a special case, handle it first
if strings.Contains(streamID, "monitoring-agent") {
Expand All @@ -186,11 +186,20 @@ func TestMonitoringConfigComponentFields(t *testing.T) {
}
continue
}
if binary != "filebeat" {
t.Errorf("expecting fields['binary'] = 'filebeat', got %q", binary)
}
if componentID != "filestream-default" {
t.Errorf("expecting fields['id'] = 'filestream-default', got %q", componentID)
if !strings.Contains(componentID, "monitoring") {
if binary != "filebeat" {
t.Errorf("expecting fields['binary'] = 'filebeat', got %q", binary)
}
if componentID != "filestream-default" {
t.Errorf("expecting fields['id'] = 'filestream-default', got %q", componentID)
}
} else {
if binary != "filebeat" && binary != "metricbeat" {
t.Errorf("expected monitoring compoent to be metricbeat or filebeat, got %s", binary)
}
if componentID != monitoringFilesUnitsID && componentID != "beat/metrics-monitoring" && componentID != "http/metrics-monitoring" {
t.Errorf("got unxpected monitoring component ID: %s", componentID)
}
}

}
Expand Down
16 changes: 9 additions & 7 deletions pkg/testing/tools/estools/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ func GetLatestDocumentMatchingQuery(ctx context.Context, client elastictransport
return Documents{}, fmt.Errorf("error creating ES query: %w", err)
}

return performQueryForRawQuery(ctx, queryRaw, indexPattern, client)
return PerformQueryForRawQuery(ctx, queryRaw, indexPattern, client)
}

// GetIndexTemplatesForPattern lists all index templates on the system
Expand Down Expand Up @@ -362,7 +362,7 @@ func FindMatchingLogLinesWithContext(ctx context.Context, client elastictranspor
return Documents{}, fmt.Errorf("error creating ES query: %w", err)
}

return performQueryForRawQuery(ctx, queryRaw, "logs-elastic_agent*", client)
return PerformQueryForRawQuery(ctx, queryRaw, "logs-elastic_agent*", client)

}

Expand Down Expand Up @@ -434,7 +434,7 @@ func CheckForErrorsInLogsWithContext(ctx context.Context, client elastictranspor
return Documents{}, fmt.Errorf("error creating ES query: %w", err)
}

return performQueryForRawQuery(ctx, queryRaw, "logs-elastic_agent*", client)
return PerformQueryForRawQuery(ctx, queryRaw, "logs-elastic_agent*", client)
}

// GetLogsForDataset returns any logs associated with the datastream
Expand Down Expand Up @@ -525,7 +525,7 @@ func GetLogsForDatasetWithContext(ctx context.Context, client elastictransport.I
},
}

return performQueryForRawQuery(ctx, indexQuery, "logs-elastic_agent*", client)
return PerformQueryForRawQuery(ctx, indexQuery, "logs-elastic_agent*", client)
}

// GetLogsForIndexWithContext returns any logs that match the given condition
Expand All @@ -536,7 +536,7 @@ func GetLogsForIndexWithContext(ctx context.Context, client elastictransport.Int
},
}

return performQueryForRawQuery(ctx, indexQuery, index, client)
return PerformQueryForRawQuery(ctx, indexQuery, index, client)
}

// GetPing performs a basic ping and returns ES config info
Expand All @@ -561,7 +561,8 @@ func GetPing(ctx context.Context, client elastictransport.Interface) (Ping, erro

}

func performQueryForRawQuery(ctx context.Context, queryRaw map[string]interface{}, index string, client elastictransport.Interface) (Documents, error) {
// PerformQueryForRawQuery executes the ES query specified by queryRaw
func PerformQueryForRawQuery(ctx context.Context, queryRaw map[string]interface{}, index string, client elastictransport.Interface) (Documents, error) {
var buf bytes.Buffer
err := json.NewEncoder(&buf).Encode(queryRaw)
if err != nil {
Expand All @@ -576,6 +577,7 @@ func performQueryForRawQuery(ctx context.Context, queryRaw map[string]interface{
es.Search.WithTrackTotalHits(true),
es.Search.WithPretty(),
es.Search.WithContext(ctx),
es.Search.WithSize(300),
)
if err != nil {
return Documents{}, fmt.Errorf("error performing ES search: %w", err)
Expand Down Expand Up @@ -613,7 +615,7 @@ func FindMatchingLogLinesForAgentWithContext(ctx context.Context, client elastic
return Documents{}, fmt.Errorf("error creating ES query: %w", err)
}

return performQueryForRawQuery(ctx, queryRaw, "logs-elastic_agent*", client)
return PerformQueryForRawQuery(ctx, queryRaw, "logs-elastic_agent*", client)
}

// GetLogsForDatastream returns any logs associated with the datastream
Expand Down
32 changes: 32 additions & 0 deletions pkg/testing/tools/kibana.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"fmt"
"net/http"
"net/url"
"os"
"time"

"github.com/elastic/elastic-agent-libs/kibana"
Expand Down Expand Up @@ -89,3 +90,34 @@ func GetDashboards(ctx context.Context, client *kibana.Client) ([]Dashboard, err

return dashboards, nil
}

// InstallPackageFromDefaultFile allows for a test ideom where a JSON policy file can be loaded, and then updated with variables that are specific to a given test.
// This can allow a single JSON policy file to be reused across multiple tests.
// existingPolicyID should be the ID of an agent policy that was already created with InstallAgentWithPolicy()
func InstallPackageFromDefaultFile(ctx context.Context, client *kibana.Client, packagePolicyName string, packageVersion string, policyJsonPath string, policyUUID string, existingPolicyID string) (kibana.PackagePolicyResponse, error) {
installPackage := kibana.PackagePolicyRequest{}

jsonRaw, err := os.ReadFile(policyJsonPath)
if err != nil {
return kibana.PackagePolicyResponse{}, fmt.Errorf("error reading JSON policy file: %w", err)
}

err = json.Unmarshal(jsonRaw, &installPackage)
if err != nil {
return kibana.PackagePolicyResponse{}, fmt.Errorf("error unmarshaling json: %w", err)
}

installPackage.Package.Version = packageVersion
installPackage.ID = policyUUID
installPackage.PolicyID = existingPolicyID
installPackage.Namespace = "default"
installPackage.Name = fmt.Sprintf("%s-test-%s", packagePolicyName, policyUUID)
installPackage.Vars = map[string]interface{}{}

resp, err := client.InstallFleetPackage(ctx, installPackage)
if err != nil {
return kibana.PackagePolicyResponse{}, fmt.Errorf("error installing fleet package: %w", err)
}

return resp, nil
}
26 changes: 2 additions & 24 deletions testing/integration/agent_long_running_leak_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ package integration
import (
"context"
"encoding/json"
"fmt"
"io"
"net"
"net/http"
Expand Down Expand Up @@ -122,33 +121,12 @@ func (runner *ExtendedRunner) SetupSuite() {
policyResp, err := tools.InstallAgentWithPolicy(ctx, runner.T(), installOpts, runner.agentFixture, runner.info.KibanaClient, basePolicy)
require.NoError(runner.T(), err)

// install system package
runner.InstallPackage(ctx, "system", "1.53.1", "agent_long_test_base_system_integ.json", uuid.New().String(), policyResp.ID)

// install cef
runner.InstallPackage(ctx, "apache", "1.17.0", "agent_long_test_apache.json", uuid.New().String(), policyResp.ID)

}

func (runner *ExtendedRunner) InstallPackage(ctx context.Context, name string, version string, cfgFile string, policyUUID string, policyID string) {
installPackage := kibana.PackagePolicyRequest{}

jsonRaw, err := os.ReadFile(cfgFile)
_, err = tools.InstallPackageFromDefaultFile(ctx, runner.info.KibanaClient, "system", "1.53.1", "agent_long_test_base_system_integ.json", uuid.New().String(), policyResp.ID)
require.NoError(runner.T(), err)

err = json.Unmarshal(jsonRaw, &installPackage)
_, err = tools.InstallPackageFromDefaultFile(ctx, runner.info.KibanaClient, "apache", "1.17.0", "agent_long_test_apache.json", uuid.New().String(), policyResp.ID)
require.NoError(runner.T(), err)

installPackage.Package.Version = version
installPackage.ID = policyUUID
installPackage.PolicyID = policyID
installPackage.Namespace = "default"
installPackage.Name = fmt.Sprintf("%s-long-test-%s", name, policyUUID)
installPackage.Vars = map[string]interface{}{}

runner.T().Logf("Installing %s package....", name)
_, err = runner.info.KibanaClient.InstallFleetPackage(ctx, installPackage)
require.NoError(runner.T(), err, "error creating fleet package")
}

func (runner *ExtendedRunner) TestHandleLeak() {
Expand Down
Loading

0 comments on commit 4594088

Please sign in to comment.