Skip to content

Commit

Permalink
count 429s in elasticsearch output (#8056)
Browse files Browse the repository at this point in the history
* count 429s in elasticsearch output
  • Loading branch information
graphaelli authored and ph committed Aug 27, 2018
1 parent 8c26770 commit 109dcce
Show file tree
Hide file tree
Showing 5 changed files with 27 additions and 7 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ https://github.com/elastic/beats/compare/v6.4.0...master[Check the HEAD diff]
- Make kubernetes autodiscover ignore events with empty container IDs {pull}7971[7971]
- Add DNS processor with support for performing reverse lookups on IP addresses. {issue}7770[7770]
- Implement CheckConfig in RunnerFactory to make autodiscover check configs {pull}7961[7961]
- Count HTTP 429 responses in the elasticsearch output {pull}8056[8056]

*Auditbeat*

Expand Down
16 changes: 11 additions & 5 deletions libbeat/outputs/elasticsearch/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ type bulkResultStats struct {
duplicates int // number of events failed with `create` due to ID already being indexed
fails int // number of failed events (can be retried)
nonIndexable int // number of failed events (not indexable -> must be dropped)
tooMany int // number of events receiving HTTP 429 Too Many Requests
}

var (
Expand Down Expand Up @@ -343,6 +344,7 @@ func (client *Client) publishEvents(
st.Failed(failed)
st.Dropped(dropped)
st.Duplicate(duplicates)
st.ErrTooMany(stats.tooMany)
}

if failed > 0 {
Expand Down Expand Up @@ -515,11 +517,15 @@ func bulkCollectPublishFails(
continue // ok
}

if status < 500 && status != 429 {
// hard failure, don't collect
logp.Warn("Cannot index event %#v (status=%v): %s", data[i], status, msg)
stats.nonIndexable++
continue
if status < 500 {
if status == http.StatusTooManyRequests {
stats.tooMany++
} else {
// hard failure, don't collect
logp.Warn("Cannot index event %#v (status=%v): %s", data[i], status, msg)
stats.nonIndexable++
continue
}
}

debugf("Bulk item insert failed (i=%v, status=%v): %s", i, status, msg)
Expand Down
6 changes: 4 additions & 2 deletions libbeat/outputs/elasticsearch/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,11 +119,12 @@ func TestCollectPublishFailMiddle(t *testing.T) {
events := []publisher.Event{event, eventFail, event}

reader := newJSONReader(response)
res, _ := bulkCollectPublishFails(reader, events)
res, stats := bulkCollectPublishFails(reader, events)
assert.Equal(t, 1, len(res))
if len(res) == 1 {
assert.Equal(t, eventFail, res[0])
}
assert.Equal(t, stats, bulkResultStats{acked: 2, fails: 1, tooMany: 1})
}

func TestCollectPublishFailAll(t *testing.T) {
Expand All @@ -139,9 +140,10 @@ func TestCollectPublishFailAll(t *testing.T) {
events := []publisher.Event{event, event, event}

reader := newJSONReader(response)
res, _ := bulkCollectPublishFails(reader, events)
res, stats := bulkCollectPublishFails(reader, events)
assert.Equal(t, 3, len(res))
assert.Equal(t, events, res)
assert.Equal(t, stats, bulkResultStats{fails: 3, tooMany: 3})
}

func TestCollectPipelinePublishFail(t *testing.T) {
Expand Down
9 changes: 9 additions & 0 deletions libbeat/outputs/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ type Stats struct {
active *monitoring.Uint // events sent and waiting for ACK/fail from output
duplicates *monitoring.Uint // events sent and waiting for ACK/fail from output
dropped *monitoring.Uint // total number of invalid events dropped by the output
tooMany *monitoring.Uint // total number of too many requests replies from output

//
// Output network connection stats
Expand All @@ -56,6 +57,7 @@ func NewStats(reg *monitoring.Registry) *Stats {
dropped: monitoring.NewUint(reg, "events.dropped"),
duplicates: monitoring.NewUint(reg, "events.duplicates"),
active: monitoring.NewUint(reg, "events.active"),
tooMany: monitoring.NewUint(reg, "events.toomany"),

writeBytes: monitoring.NewUint(reg, "write.bytes"),
writeErrors: monitoring.NewUint(reg, "write.errors"),
Expand Down Expand Up @@ -117,6 +119,13 @@ func (s *Stats) Cancelled(n int) {
}
}

// ErrTooMany updates the number of Too Many Requests responses reported by the output.
func (s *Stats) ErrTooMany(n int) {
if s != nil {
s.tooMany.Add(uint64(n))
}
}

// WriteError increases the write I/O error metrics.
func (s *Stats) WriteError(err error) {
if s != nil {
Expand Down
2 changes: 2 additions & 0 deletions libbeat/outputs/observer.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ type Observer interface {
WriteBytes(int) // report number of bytes being written
ReadError(error) // report an I/O error on read
ReadBytes(int) // report number of bytes being read
ErrTooMany(int) // report too many requests response
}

type emptyObserver struct{}
Expand All @@ -51,3 +52,4 @@ func (*emptyObserver) WriteError(error) {}
func (*emptyObserver) WriteBytes(int) {}
func (*emptyObserver) ReadError(error) {}
func (*emptyObserver) ReadBytes(int) {}
func (*emptyObserver) ErrTooMany(int) {}

0 comments on commit 109dcce

Please sign in to comment.