Skip to content

Commit

Permalink
Cherry-pick #18534 to 7.7: Address Okta input issue #18530 (#18549)
Browse files Browse the repository at this point in the history
* Address Okta input issue #18530 (#18534)
  • Loading branch information
Lei Qiu authored May 15, 2020
1 parent 50b50e6 commit f39ac65
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 5 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Unescape file name from SQS message. {pull}18370[18370]
- Improve cisco asa and ftd pipelines' failure handler to avoid mapping temporary fields. {issue}18391[18391] {pull}18392[18392]
- Fix `googlecloud.audit` pipeline to only take in fields that are explicitly defined by the dataset. {issue}18465[18465] {pull}18472[18472]
- Fix a rate limit related issue in httpjson input for Okta module. {issue}18530[18530] {pull}18534[18534]

*Heartbeat*

Expand Down
22 changes: 21 additions & 1 deletion x-pack/filebeat/input/httpjson/httpjson_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,7 @@ func TestCreateRequestInfoFromBody(t *testing.T) {
}
}

// Test getRateLimit function with a remaining quota, expect to receive 0, nil.
func TestGetRateLimitCase1(t *testing.T) {
header := make(http.Header)
header.Add("X-Rate-Limit-Limit", "120")
Expand All @@ -350,6 +351,7 @@ func TestGetRateLimitCase1(t *testing.T) {
}
}

// Test getRateLimit function with a past time, expect to receive 0, nil.
func TestGetRateLimitCase2(t *testing.T) {
header := make(http.Header)
header.Add("X-Rate-Limit-Limit", "10")
Expand All @@ -361,7 +363,25 @@ func TestGetRateLimitCase2(t *testing.T) {
Remaining: "X-Rate-Limit-Remaining",
}
epoch, err := getRateLimit(header, rateLimit)
if err != nil || epoch != 1581658643 {
if err != nil || epoch != 0 {
t.Fatal("Failed to test getRateLimit.")
}
}

// Test getRateLimit function with a time yet to come, expect to receive <reset-value>, nil.
func TestGetRateLimitCase3(t *testing.T) {
epoch := time.Now().Unix() + 100
header := make(http.Header)
header.Add("X-Rate-Limit-Limit", "10")
header.Add("X-Rate-Limit-Remaining", "0")
header.Add("X-Rate-Limit-Reset", strconv.FormatInt(epoch, 10))
rateLimit := &RateLimit{
Limit: "X-Rate-Limit-Limit",
Reset: "X-Rate-Limit-Reset",
Remaining: "X-Rate-Limit-Remaining",
}
epoch2, err := getRateLimit(header, rateLimit)
if err != nil || epoch2 != epoch {
t.Fatal("Failed to test getRateLimit.")
}
}
Expand Down
16 changes: 12 additions & 4 deletions x-pack/filebeat/input/httpjson/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,10 @@ func getNextLinkFromHeader(header http.Header, fieldName string, re *regexp.Rege
return "", nil
}

// getRateLimit get the rate limit value if specified in the HTTP Header of the response
// getRateLimit get the rate limit value if specified in the HTTP Header of the response,
// and returns an init64 value in seconds since unix epoch for rate limit reset time.
// When there is a remaining rate limit quota, or when the rate limit reset time has expired, it
// returns 0 for the epoch value.
func getRateLimit(header http.Header, rateLimit *RateLimit) (int64, error) {
if rateLimit != nil {
if rateLimit.Remaining != "" {
Expand All @@ -222,6 +225,9 @@ func getRateLimit(header http.Header, rateLimit *RateLimit) (int64, error) {
if err != nil {
return 0, errors.Wrapf(err, "failed to parse rate-limit reset value")
}
if time.Unix(epoch, 0).Sub(time.Now()) <= 0 {
return 0, nil
}
return epoch, nil
}
}
Expand All @@ -235,12 +241,14 @@ func (in *HttpjsonInput) applyRateLimit(ctx context.Context, header http.Header,
if err != nil {
return err
}
if epoch == 0 {
t := time.Unix(epoch, 0)
w := time.Until(t)
if epoch == 0 || w <= 0 {
in.log.Debugf("Rate Limit: No need to apply rate limit.")
return nil
}
t := time.Unix(epoch, 0)
in.log.Debugf("Rate Limit: Wait until %v for the rate limit to reset.", t)
ticker := time.NewTicker(time.Until(t))
ticker := time.NewTicker(w)
defer ticker.Stop()
select {
case <-ctx.Done():
Expand Down

0 comments on commit f39ac65

Please sign in to comment.