From 225eb85d5f6ff3cc788c71925ae789bbe0d569a1 Mon Sep 17 00:00:00 2001 From: Marc Guasch Date: Wed, 1 Jul 2020 12:32:00 +0200 Subject: [PATCH] [Filebeat][httpjson] Add date_cursor to httpjson input (#19483) * Add date_cursor to httpjson input * Add changelog entry * Fix tests * Default to UTC date * Add date_cursor validations and better error message * Run fmt update (cherry picked from commit 775f13428966aca51fa608641c4b957bd0ae794c) --- CHANGELOG.next.asciidoc | 1 + go.mod | 2 +- x-pack/filebeat/input/httpjson/config.go | 53 ++++++++++++++ x-pack/filebeat/input/httpjson/config_test.go | 27 +++++++ .../filebeat/input/httpjson/httpjson_test.go | 48 ++++++++++++ x-pack/filebeat/input/httpjson/input.go | 73 ++++++++++++++++++- 6 files changed, 199 insertions(+), 5 deletions(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 9867bd2a4d6c..32e7509dc136 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -67,6 +67,7 @@ field. You can revert this change by configuring tags for the module and omittin - Okta module now requires objects instead of JSON strings for the `http_headers`, `http_request_body`, `pagination`, `rate_limit`, and `ssl` variables. {pull}18953[18953] - Adds oauth support for httpjson input. {issue}18415[18415] {pull}18892[18892] - Adds `split_events_by` option to httpjson input. {pull}19246[19246] +- Adds `date_cursor` option to httpjson input. {pull}19483[19483] *Heartbeat* diff --git a/go.mod b/go.mod index e602b70c2f86..a21c2dbb7959 100644 --- a/go.mod +++ b/go.mod @@ -166,7 +166,7 @@ require ( golang.org/x/sys v0.0.0-20200202164722-d101bd2416d5 golang.org/x/text v0.3.2 golang.org/x/time v0.0.0-20191024005414-555d28b269f0 - golang.org/x/tools v0.0.0-20200630154851-b2d8b0336632 + golang.org/x/tools v0.0.0-20200701041122-1837592efa10 google.golang.org/api v0.15.0 google.golang.org/genproto v0.0.0-20191230161307-f3c370f40bfb google.golang.org/grpc v1.29.1 diff --git a/x-pack/filebeat/input/httpjson/config.go b/x-pack/filebeat/input/httpjson/config.go index 2fcc2fc8941c..63d20221de47 100644 --- a/x-pack/filebeat/input/httpjson/config.go +++ b/x-pack/filebeat/input/httpjson/config.go @@ -7,6 +7,7 @@ package httpjson import ( "regexp" "strings" + "text/template" "time" "github.com/pkg/errors" @@ -35,6 +36,7 @@ type config struct { RetryWaitMax time.Duration `config:"retry.wait_max"` TLS *tlscommon.Config `config:"ssl"` URL string `config:"url" validate:"required"` + DateCursor *DateCursor `config:"date_cursor"` } // Pagination contains information about httpjson pagination settings @@ -65,6 +67,54 @@ type RateLimit struct { Remaining string `config:"remaining"` } +type DateCursor struct { + Enabled *bool `config:"enabled"` + Field string `config:"field" validate:"required"` + URLField string `config:"url_field" validate:"required"` + ValueTemplate *Template `config:"value_template"` + DateFormat string `config:"date_format"` + InitialInterval time.Duration `config:"initial_interval"` +} + +type Template struct { + *template.Template +} + +func (t *Template) Unpack(in string) error { + tpl, err := template.New("tpl").Parse(in) + if err != nil { + return err + } + + *t = Template{Template: tpl} + + return nil +} + +// IsEnabled returns true if the `enable` field is set to true in the yaml. +func (dc *DateCursor) IsEnabled() bool { + return dc != nil && (dc.Enabled == nil || *dc.Enabled) +} + +// IsEnabled returns true if the `enable` field is set to true in the yaml. +func (dc *DateCursor) GetDateFormat() string { + if dc.DateFormat == "" { + return time.RFC3339 + } + return dc.DateFormat +} + +func (dc *DateCursor) Validate() error { + if dc.DateFormat == "" { + return nil + } + now := time.Now().Format(dc.DateFormat) + if _, err := time.Parse(dc.DateFormat, now); err != nil { + return errors.New("invalid configuration: date_format is not a valid date layout") + } + return nil +} + func (c *config) Validate() error { switch strings.ToUpper(c.HTTPMethod) { case "GET", "POST": @@ -81,6 +131,9 @@ func (c *config) Validate() error { } } if c.Pagination != nil { + if c.DateCursor.IsEnabled() { + return errors.Errorf("invalid configuration: date_cursor cannnot be set in combination with other pagination mechanisms") + } if c.Pagination.Header != nil { if c.Pagination.RequestField != "" || c.Pagination.IDField != "" || len(c.Pagination.ExtraBodyContent) > 0 { return errors.Errorf("invalid configuration: both pagination.header and pagination.req_field or pagination.id_field or pagination.extra_body_content cannot be set simultaneously") diff --git a/x-pack/filebeat/input/httpjson/config_test.go b/x-pack/filebeat/input/httpjson/config_test.go index cfec6a2440be..a86c2aa76db6 100644 --- a/x-pack/filebeat/input/httpjson/config_test.go +++ b/x-pack/filebeat/input/httpjson/config_test.go @@ -8,6 +8,7 @@ import ( "context" "os" "testing" + "time" "github.com/pkg/errors" "golang.org/x/oauth2/google" @@ -350,6 +351,32 @@ func TestConfigOauth2Validation(t *testing.T) { "url": "localhost", }, }, + { + name: "date_cursor must fail in combination with pagination", + expectedErr: "invalid configuration: date_cursor cannnot be set in combination with other pagination mechanisms accessing config", + input: map[string]interface{}{ + "date_cursor": map[string]interface{}{"field": "foo", "url_field": "foo"}, + "pagination": map[string]interface{}{ + "header": map[string]interface{}{"field_name": "foo", "regex_pattern": "bar"}, + }, + "url": "localhost", + }, + }, + { + name: "date_cursor.date_format will fail if invalid", + expectedErr: "invalid configuration: date_format is not a valid date layout accessing 'date_cursor'", + input: map[string]interface{}{ + "date_cursor": map[string]interface{}{"field": "foo", "url_field": "foo", "date_format": "1234"}, + "url": "localhost", + }, + }, + { + name: "date_cursor must work with a valid date_format", + input: map[string]interface{}{ + "date_cursor": map[string]interface{}{"field": "foo", "url_field": "foo", "date_format": time.RFC3339}, + "url": "localhost", + }, + }, } for _, c := range cases { diff --git a/x-pack/filebeat/input/httpjson/httpjson_test.go b/x-pack/filebeat/input/httpjson/httpjson_test.go index 4e70fe72472f..75374404eeae 100644 --- a/x-pack/filebeat/input/httpjson/httpjson_test.go +++ b/x-pack/filebeat/input/httpjson/httpjson_test.go @@ -7,6 +7,7 @@ package httpjson import ( "context" "encoding/json" + "fmt" "io/ioutil" "log" "math/rand" @@ -727,3 +728,50 @@ func TestArrayWithSplitResponse(t *testing.T) { } }) } + +func TestCursor(t *testing.T) { + m := map[string]interface{}{ + "http_method": "GET", + "date_cursor.field": "@timestamp", + "date_cursor.url_field": "$filter", + "date_cursor.value_template": "alertCreationTime ge {{.}}", + "date_cursor.initial_interval": "10m", + "date_cursor.date_format": "2006-01-02T15:04:05Z", + } + + timeNow = func() time.Time { + t, _ := time.Parse("2006-01-02T15:04:05Z", "2002-10-02T15:10:00Z") + return t + } + + const ( + expectedQuery = "%24filter=alertCreationTime+ge+2002-10-02T15%3A00%3A00Z" + expectedNextCursorValue = "2002-10-02T15:00:01Z" + expectedNextQuery = "%24filter=alertCreationTime+ge+2002-10-02T15%3A00%3A01Z" + ) + var gotQuery string + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + gotQuery = r.URL.Query().Encode() + w.Write([]byte(`[{"@timestamp":"2002-10-02T15:00:00Z"},{"@timestamp":"2002-10-02T15:00:01Z"}]`)) + })) + + runTest(t, ts, m, func(input *HttpjsonInput, out *stubOutleter, t *testing.T) { + group, _ := errgroup.WithContext(context.Background()) + group.Go(input.run) + + events, ok := out.waitForEvents(2) + if !ok { + t.Fatalf("Expected 2 events, but got %d.", len(events)) + } + input.Stop() + + if err := group.Wait(); err != nil { + t.Fatal(err) + } + + assert.Equal(t, expectedQuery, gotQuery) + assert.Equal(t, expectedNextCursorValue, input.nextCursorValue) + assert.Equal(t, fmt.Sprintf("%s?%s", ts.URL, expectedNextQuery), input.getURL()) + }) +} diff --git a/x-pack/filebeat/input/httpjson/input.go b/x-pack/filebeat/input/httpjson/input.go index 39726eac1774..f06f0cd82bd1 100644 --- a/x-pack/filebeat/input/httpjson/input.go +++ b/x-pack/filebeat/input/httpjson/input.go @@ -12,6 +12,7 @@ import ( "io/ioutil" "net" "net/http" + "net/url" "regexp" "strconv" "sync" @@ -37,6 +38,9 @@ const ( var userAgent = useragent.UserAgent("Filebeat", false) +// for testing +var timeNow = time.Now + func init() { err := input.Register(inputName, NewInput) if err != nil { @@ -55,6 +59,8 @@ type HttpjsonInput struct { workerCancel context.CancelFunc // Used to signal that the worker should stop. workerOnce sync.Once // Guarantees that the worker goroutine is only started once. workerWg sync.WaitGroup // Waits on worker goroutine. + + nextCursorValue string } // RequestInfo struct has the information for generating an HTTP request @@ -343,6 +349,7 @@ func createRequestInfoFromBody(m common.MapStr, idField string, requestField str // processHTTPRequest processes HTTP request, and handles pagination if enabled func (in *HttpjsonInput) processHTTPRequest(ctx context.Context, client *http.Client, ri *RequestInfo) error { + ri.URL = in.getURL() for { req, err := in.createHTTPRequest(ctx, ri) if err != nil { @@ -407,8 +414,7 @@ func (in *HttpjsonInput) processHTTPRequest(ctx context.Context, client *http.Cl in.log.Debug("http.response.body is not a valid JSON object", string(responseData)) return errors.Errorf("http.response.body is not a valid JSON object, but a %T", obj) } - - if mm != nil && in.config.Pagination != nil && in.config.Pagination.IsEnabled() { + if mm != nil && in.config.Pagination.IsEnabled() { if in.config.Pagination.Header != nil { // Pagination control using HTTP Header url, err := getNextLinkFromHeader(header, in.config.Pagination.Header.FieldName, in.config.Pagination.Header.RegexPattern) @@ -427,7 +433,7 @@ func (in *HttpjsonInput) processHTTPRequest(ctx context.Context, client *http.Cl continue } else { // Pagination control using HTTP Body fields - ri, err := createRequestInfoFromBody(common.MapStr(mm), in.config.Pagination.IDField, in.config.Pagination.RequestField, common.MapStr(in.config.Pagination.ExtraBodyContent), in.config.Pagination.URL, ri) + ri, err = createRequestInfoFromBody(common.MapStr(mm), in.config.Pagination.IDField, in.config.Pagination.RequestField, common.MapStr(in.config.Pagination.ExtraBodyContent), in.config.Pagination.URL, ri) if err != nil { return err } @@ -441,10 +447,70 @@ func (in *HttpjsonInput) processHTTPRequest(ctx context.Context, client *http.Cl continue } } + if mm != nil && in.config.DateCursor.IsEnabled() { + in.advanceCursor(common.MapStr(mm)) + } return nil } } +func (in *HttpjsonInput) getURL() string { + if !in.config.DateCursor.IsEnabled() { + return in.config.URL + } + + var dateStr string + if in.nextCursorValue == "" { + t := timeNow().UTC().Add(-in.config.DateCursor.InitialInterval) + dateStr = t.Format(in.config.DateCursor.GetDateFormat()) + } else { + dateStr = in.nextCursorValue + } + + url, err := url.Parse(in.config.URL) + if err != nil { + return in.config.URL + } + + q := url.Query() + + var value string + if in.config.DateCursor.ValueTemplate == nil { + value = dateStr + } else { + buf := new(bytes.Buffer) + if err := in.config.DateCursor.ValueTemplate.Execute(buf, dateStr); err != nil { + return in.config.URL + } + value = buf.String() + } + + q.Set(in.config.DateCursor.URLField, value) + + url.RawQuery = q.Encode() + + return url.String() +} + +func (in *HttpjsonInput) advanceCursor(m common.MapStr) { + v, err := m.GetValue(in.config.DateCursor.Field) + if err != nil { + in.log.Warnf("date_cursor field: %q", err) + return + } + switch t := v.(type) { + case string: + _, err := time.Parse(in.config.DateCursor.GetDateFormat(), t) + if err != nil { + return + } + in.nextCursorValue = t + default: + in.log.Warn("date_cursor field must be a string, cursor will not advance") + return + } +} + func (in *HttpjsonInput) run() error { ctx, cancel := context.WithCancel(in.workerCtx) defer cancel() @@ -455,7 +521,6 @@ func (in *HttpjsonInput) run() error { } ri := &RequestInfo{ - URL: in.URL, ContentMap: common.MapStr{}, Headers: in.HTTPHeaders, }