From 3fa11affc08fd49c895c470f1e450a23283ae0aa Mon Sep 17 00:00:00 2001 From: Tiago Queiroz Date: Thu, 6 Feb 2025 10:20:27 -0500 Subject: [PATCH 1/3] Add conditions to copy_fields processor This commit adds conditions to the `copy_fields` processor from the monitoring Filebeat to prevent it from failing and spamming the event logger at debug level with: `target field xxx already exists, drop or rename this field first` --- ...-conditions-to-copy_fields-processors.yaml | 33 +++++++++++++++++++ .../application/monitoring/v1_monitor.go | 28 ++++++++++++++++ 2 files changed, 61 insertions(+) create mode 100644 changelog/fragments/1738855375-Add-conditions-to-copy_fields-processors.yaml diff --git a/changelog/fragments/1738855375-Add-conditions-to-copy_fields-processors.yaml b/changelog/fragments/1738855375-Add-conditions-to-copy_fields-processors.yaml new file mode 100644 index 00000000000..63abe2db800 --- /dev/null +++ b/changelog/fragments/1738855375-Add-conditions-to-copy_fields-processors.yaml @@ -0,0 +1,33 @@ +# Kind can be one of: +# - breaking-change: a change to previously-documented behavior +# - deprecation: functionality that is being removed in a later release +# - bug-fix: fixes a problem in a previous version +# - enhancement: extends functionality but does not break or fix existing behavior +# - feature: new functionality +# - known-issue: problems that we are aware of in a given version +# - security: impacts on the security of a product or a user’s deployment. +# - upgrade: important information for someone upgrading from a prior version +# - other: does not fit into any of the other categories +kind: bug-fix + +# Change summary; a 80ish characters long description of the change. +summary: > + Add conditions to copy_fields processors to prevent spamming the debug logs + +# Long description; in case the summary is not enough to describe the change +# this field accommodate a description without length limits. +# NOTE: This field will be rendered only for breaking-change and known-issue kinds at the moment. +description: + +# Affected component; usually one of "elastic-agent", "fleet-server", "filebeat", "metricbeat", "auditbeat", "all", etc. +component: elastic-agent + +# PR URL; optional; the PR number that added the changeset. +# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added. +# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number. +# Please provide it if you are adding a fragment for a different PR. +pr: https://github.com/elastic/elastic-agent/pull/6730 + +# Issue URL; optional; the GitHub issue related to this changeset (either closes or is part of). +# If not present is automatically filled by the tooling with the issue linked to the PR number. +issue: https://github.com/elastic/elastic-agent/issues/5299 diff --git a/internal/pkg/agent/application/monitoring/v1_monitor.go b/internal/pkg/agent/application/monitoring/v1_monitor.go index 55e9a4dc3f9..bb3bd43a487 100644 --- a/internal/pkg/agent/application/monitoring/v1_monitor.go +++ b/internal/pkg/agent/application/monitoring/v1_monitor.go @@ -424,6 +424,13 @@ func (b *BeatsMonitor) injectLogsInput(cfg map[string]interface{}, components [] "to": "data_stream.dataset_original", }, }, + "when": map[string]any{ + "not": map[string]any{ + "has_fields": []any{ + "data_stream.dataset_original", + }, + }, + }, }, }, // drop the dataset field so following copy_field can copy to it @@ -443,6 +450,13 @@ func (b *BeatsMonitor) injectLogsInput(cfg map[string]interface{}, components [] "to": "data_stream.dataset", }, }, + "when": map[string]any{ + "not": map[string]any{ + "has_fields": []any{ + "data_stream.dataset", + }, + }, + }, "fail_on_error": false, "ignore_missing": true, }, @@ -450,6 +464,13 @@ func (b *BeatsMonitor) injectLogsInput(cfg map[string]interface{}, components [] // possible it's a log message from agent itself (doesn't have component.dataset) map[string]interface{}{ "copy_fields": map[string]interface{}{ + "when": map[string]any{ + "not": map[string]any{ + "has_fields": []any{ + "data_stream.dataset", + }, + }, + }, "fields": []interface{}{ map[string]interface{}{ "from": "data_stream.dataset_original", @@ -471,6 +492,13 @@ func (b *BeatsMonitor) injectLogsInput(cfg map[string]interface{}, components [] // update event.dataset with the now used data_stream.dataset map[string]interface{}{ "copy_fields": map[string]interface{}{ + "when": map[string]any{ + "not": map[string]any{ + "has_fields": []any{ + "event.dataset", + }, + }, + }, "fields": []interface{}{ map[string]interface{}{ "from": "data_stream.dataset", From f46a6d9da7bc40ce126c9c088dcb1a82a6eb746f Mon Sep 17 00:00:00 2001 From: Tiago Queiroz Date: Thu, 6 Feb 2025 13:23:38 -0500 Subject: [PATCH 2/3] Add integration tests --- testing/integration/event_logging_test.go | 45 +++++++++++++++++++--- testing/integration/logs_ingestion_test.go | 2 +- 2 files changed, 40 insertions(+), 7 deletions(-) diff --git a/testing/integration/event_logging_test.go b/testing/integration/event_logging_test.go index 90397c5af68..bca83b9986f 100644 --- a/testing/integration/event_logging_test.go +++ b/testing/integration/event_logging_test.go @@ -10,6 +10,7 @@ import ( "bufio" "bytes" "context" + "encoding/json" "fmt" "net/http" "net/http/httputil" @@ -37,11 +38,11 @@ outputs: hosts: - %s protocol: http - preset: balanced - + preset: latency inputs: - type: filestream id: your-input-id + log_level: debug streams: - id: your-filestream-stream-id data_stream: @@ -87,7 +88,7 @@ func TestEventLogFile(t *testing.T) { esURL := startMockES(t) logFilepath := path.Join(t.TempDir(), t.Name()) - generateLogFile(t, logFilepath, time.Millisecond*100, 1) + generateLogFile(t, logFilepath, time.Millisecond*100, 20) cfg := fmt.Sprintf(eventLogConfig, esURL, logFilepath) @@ -126,6 +127,7 @@ func TestEventLogFile(t *testing.T) { // Now the Elastic-Agent is running, so validate the Event log file. requireEventLogFileExistsWithData(t, agentFixture) + requireNoCopyProcessorError(t, agentFixture) // The diagnostics command is already tested by another test, // here we just want to validate the events log behaviour @@ -307,7 +309,7 @@ func addOverwriteToPolicy(t *testing.T, info *define.Info, policyName, policyID } } -func requireEventLogFileExistsWithData(t *testing.T, agentFixture *atesting.Fixture) { +func readEventLogFile(t *testing.T, agentFixture *atesting.Fixture) string { // Now the Elastic-Agent is running, so validate the Event log file. // Because the path changes based on the Elastic-Agent version, we // use glob to find the file @@ -338,8 +340,39 @@ func requireEventLogFileExistsWithData(t *testing.T, agentFixture *atesting.Fixt t.Fatalf("cannot read file '%s': %s", logFileName, err) } - logEntry := string(logEntryBytes) - expectedStr := "Cannot index event" + return string(logEntryBytes) +} + +func requireNoCopyProcessorError(t *testing.T, agentFixture *atesting.Fixture) { + data := readEventLogFile(t, agentFixture) + for _, line := range strings.Split(data, "\n") { + logEntry := struct { + LogLogger string `json:"log.logger"` + Message string `json:"message"` + }{} + + if len(line) == 0 { + continue + } + if err := json.Unmarshal([]byte(line), &logEntry); err != nil { + t.Fatalf("could not parse log entry: %q", line) + } + + if logEntry.LogLogger == "copy_fields" { + if strings.Contains(logEntry.Message, "Failed to copy fields") { + if strings.Contains(logEntry.Message, "already exists, drop or rename this field first") { + t.Fatal("copy_fields processor must not fail") + } + } + } + } +} + +func requireEventLogFileExistsWithData(t *testing.T, agentFixture *atesting.Fixture) { + logEntry := readEventLogFile(t, agentFixture) + // That's part of the generated event that is logged by the 'processor' + // logger at level debug + expectedStr := "TestEventLogFile" if !strings.Contains(logEntry, expectedStr) { t.Errorf( "did not find the expected log entry ('%s') in the events log file", diff --git a/testing/integration/logs_ingestion_test.go b/testing/integration/logs_ingestion_test.go index b6bdef4e31b..d37dc12dca4 100644 --- a/testing/integration/logs_ingestion_test.go +++ b/testing/integration/logs_ingestion_test.go @@ -128,7 +128,7 @@ func startMockES(t *testing.T) string { uid, clusterUUID, nil, - time.Now().Add(time.Hour), 0, 0, 0, 100, 0)) + time.Now().Add(time.Hour), 0, 0, 0, 0, 0)) s := httptest.NewServer(mux) t.Cleanup(s.Close) From 7f3b86f3dbd9d0756445f3e08a1be80b053b9334 Mon Sep 17 00:00:00 2001 From: Tiago Queiroz Date: Mon, 10 Feb 2025 12:42:26 -0500 Subject: [PATCH 3/3] Remove unnecessary conditions --- .../application/monitoring/v1_monitor.go | 21 ------------------- 1 file changed, 21 deletions(-) diff --git a/internal/pkg/agent/application/monitoring/v1_monitor.go b/internal/pkg/agent/application/monitoring/v1_monitor.go index bb3bd43a487..02d25d61dcb 100644 --- a/internal/pkg/agent/application/monitoring/v1_monitor.go +++ b/internal/pkg/agent/application/monitoring/v1_monitor.go @@ -424,13 +424,6 @@ func (b *BeatsMonitor) injectLogsInput(cfg map[string]interface{}, components [] "to": "data_stream.dataset_original", }, }, - "when": map[string]any{ - "not": map[string]any{ - "has_fields": []any{ - "data_stream.dataset_original", - }, - }, - }, }, }, // drop the dataset field so following copy_field can copy to it @@ -450,13 +443,6 @@ func (b *BeatsMonitor) injectLogsInput(cfg map[string]interface{}, components [] "to": "data_stream.dataset", }, }, - "when": map[string]any{ - "not": map[string]any{ - "has_fields": []any{ - "data_stream.dataset", - }, - }, - }, "fail_on_error": false, "ignore_missing": true, }, @@ -492,13 +478,6 @@ func (b *BeatsMonitor) injectLogsInput(cfg map[string]interface{}, components [] // update event.dataset with the now used data_stream.dataset map[string]interface{}{ "copy_fields": map[string]interface{}{ - "when": map[string]any{ - "not": map[string]any{ - "has_fields": []any{ - "event.dataset", - }, - }, - }, "fields": []interface{}{ map[string]interface{}{ "from": "data_stream.dataset",