Skip to content

Commit

Permalink
count 429s in elasticsearch output
Browse files Browse the repository at this point in the history
  • Loading branch information
graphaelli committed Aug 22, 2018
1 parent a2b0f15 commit a397c1f
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 5 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ https://github.com/elastic/beats/compare/v6.4.0...master[Check the HEAD diff]
- Make kubernetes autodiscover ignore events with empty container IDs {pull}7971[7971]
- Add DNS processor with support for performing reverse lookups on IP addresses. {issue}7770[7770]
- Implement CheckConfig in RunnerFactory to make autodiscover check configs {pull}7961[7961]
- Count HTTP 429 responses in the elasticsearch output {pull}8056[8056]

*Auditbeat*

Expand Down
16 changes: 11 additions & 5 deletions libbeat/outputs/elasticsearch/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ type bulkResultStats struct {
duplicates int // number of events failed with `create` due to ID already being indexed
fails int // number of failed events (can be retried)
nonIndexable int // number of failed events (not indexable -> must be dropped)
tooMany int // number of events receiving HTTP 429 Too Many Requests
}

var (
Expand Down Expand Up @@ -343,6 +344,7 @@ func (client *Client) publishEvents(
st.Failed(failed)
st.Dropped(dropped)
st.Duplicate(duplicates)
st.ErrTooMany(stats.tooMany)
}

if failed > 0 {
Expand Down Expand Up @@ -515,11 +517,15 @@ func bulkCollectPublishFails(
continue // ok
}

if status < 500 && status != 429 {
// hard failure, don't collect
logp.Warn("Cannot index event %#v (status=%v): %s", data[i], status, msg)
stats.nonIndexable++
continue
if status < 500 {
if status == http.StatusTooManyRequests {
stats.tooMany++
} else {
// hard failure, don't collect
logp.Warn("Cannot index event %#v (status=%v): %s", data[i], status, msg)
stats.nonIndexable++
continue
}
}

debugf("Bulk item insert failed (i=%v, status=%v): %s", i, status, msg)
Expand Down
9 changes: 9 additions & 0 deletions libbeat/outputs/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ type Stats struct {
active *monitoring.Uint // events sent and waiting for ACK/fail from output
duplicates *monitoring.Uint // events sent and waiting for ACK/fail from output
dropped *monitoring.Uint // total number of invalid events dropped by the output
tooMany *monitoring.Uint // total number of too many requests replies from output

//
// Output network connection stats
Expand All @@ -56,6 +57,7 @@ func NewStats(reg *monitoring.Registry) *Stats {
dropped: monitoring.NewUint(reg, "events.dropped"),
duplicates: monitoring.NewUint(reg, "events.duplicates"),
active: monitoring.NewUint(reg, "events.active"),
tooMany: monitoring.NewUint(reg, "events.toomany"),

writeBytes: monitoring.NewUint(reg, "write.bytes"),
writeErrors: monitoring.NewUint(reg, "write.errors"),
Expand Down Expand Up @@ -117,6 +119,13 @@ func (s *Stats) Cancelled(n int) {
}
}

// ErrTooMany updates the number of Too Many Requests responses reported by the output.
func (s *Stats) ErrTooMany(n int) {
if s != nil {
s.tooMany.Add(uint64(n))
}
}

// WriteError increases the write I/O error metrics.
func (s *Stats) WriteError(err error) {
if s != nil {
Expand Down
2 changes: 2 additions & 0 deletions libbeat/outputs/observer.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ type Observer interface {
WriteBytes(int) // report number of bytes being written
ReadError(error) // report an I/O error on read
ReadBytes(int) // report number of bytes being read
ErrTooMany(int) // report too many requests response
}

type emptyObserver struct{}
Expand All @@ -51,3 +52,4 @@ func (*emptyObserver) WriteError(error) {}
func (*emptyObserver) WriteBytes(int) {}
func (*emptyObserver) ReadError(error) {}
func (*emptyObserver) ReadBytes(int) {}
func (*emptyObserver) ErrTooMany(int) {}

0 comments on commit a397c1f

Please sign in to comment.