diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 9f3c0eb6af2..51d23bac8ef 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -129,6 +129,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Unescape file name from SQS message. {pull}18370[18370] - Improve cisco asa and ftd pipelines' failure handler to avoid mapping temporary fields. {issue}18391[18391] {pull}18392[18392] - Fix `googlecloud.audit` pipeline to only take in fields that are explicitly defined by the dataset. {issue}18465[18465] {pull}18472[18472] +- Fix a rate limit related issue in httpjson input for Okta module. {issue}18530[18530] {pull}18534[18534] *Heartbeat* diff --git a/x-pack/filebeat/input/httpjson/httpjson_test.go b/x-pack/filebeat/input/httpjson/httpjson_test.go index 4faa190544e..33643ac29ab 100644 --- a/x-pack/filebeat/input/httpjson/httpjson_test.go +++ b/x-pack/filebeat/input/httpjson/httpjson_test.go @@ -334,6 +334,7 @@ func TestCreateRequestInfoFromBody(t *testing.T) { } } +// Test getRateLimit function with a remaining quota, expect to receive 0, nil. func TestGetRateLimitCase1(t *testing.T) { header := make(http.Header) header.Add("X-Rate-Limit-Limit", "120") @@ -350,6 +351,7 @@ func TestGetRateLimitCase1(t *testing.T) { } } +// Test getRateLimit function with a past time, expect to receive 0, nil. func TestGetRateLimitCase2(t *testing.T) { header := make(http.Header) header.Add("X-Rate-Limit-Limit", "10") @@ -361,7 +363,25 @@ func TestGetRateLimitCase2(t *testing.T) { Remaining: "X-Rate-Limit-Remaining", } epoch, err := getRateLimit(header, rateLimit) - if err != nil || epoch != 1581658643 { + if err != nil || epoch != 0 { + t.Fatal("Failed to test getRateLimit.") + } +} + +// Test getRateLimit function with a time yet to come, expect to receive , nil. +func TestGetRateLimitCase3(t *testing.T) { + epoch := time.Now().Unix() + 100 + header := make(http.Header) + header.Add("X-Rate-Limit-Limit", "10") + header.Add("X-Rate-Limit-Remaining", "0") + header.Add("X-Rate-Limit-Reset", strconv.FormatInt(epoch, 10)) + rateLimit := &RateLimit{ + Limit: "X-Rate-Limit-Limit", + Reset: "X-Rate-Limit-Reset", + Remaining: "X-Rate-Limit-Remaining", + } + epoch2, err := getRateLimit(header, rateLimit) + if err != nil || epoch2 != epoch { t.Fatal("Failed to test getRateLimit.") } } diff --git a/x-pack/filebeat/input/httpjson/input.go b/x-pack/filebeat/input/httpjson/input.go index 4801223c583..b792e95a841 100644 --- a/x-pack/filebeat/input/httpjson/input.go +++ b/x-pack/filebeat/input/httpjson/input.go @@ -201,7 +201,10 @@ func getNextLinkFromHeader(header http.Header, fieldName string, re *regexp.Rege return "", nil } -// getRateLimit get the rate limit value if specified in the HTTP Header of the response +// getRateLimit get the rate limit value if specified in the HTTP Header of the response, +// and returns an init64 value in seconds since unix epoch for rate limit reset time. +// When there is a remaining rate limit quota, or when the rate limit reset time has expired, it +// returns 0 for the epoch value. func getRateLimit(header http.Header, rateLimit *RateLimit) (int64, error) { if rateLimit != nil { if rateLimit.Remaining != "" { @@ -222,6 +225,9 @@ func getRateLimit(header http.Header, rateLimit *RateLimit) (int64, error) { if err != nil { return 0, errors.Wrapf(err, "failed to parse rate-limit reset value") } + if time.Unix(epoch, 0).Sub(time.Now()) <= 0 { + return 0, nil + } return epoch, nil } } @@ -235,12 +241,14 @@ func (in *HttpjsonInput) applyRateLimit(ctx context.Context, header http.Header, if err != nil { return err } - if epoch == 0 { + t := time.Unix(epoch, 0) + w := time.Until(t) + if epoch == 0 || w <= 0 { + in.log.Debugf("Rate Limit: No need to apply rate limit.") return nil } - t := time.Unix(epoch, 0) in.log.Debugf("Rate Limit: Wait until %v for the rate limit to reset.", t) - ticker := time.NewTicker(time.Until(t)) + ticker := time.NewTicker(w) defer ticker.Stop() select { case <-ctx.Done():