diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index 45578316834..286da533dcd 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -85,6 +85,7 @@ https://github.com/elastic/beats/compare/v6.4.0...master[Check the HEAD diff] - Make kubernetes autodiscover ignore events with empty container IDs {pull}7971[7971] - Add DNS processor with support for performing reverse lookups on IP addresses. {issue}7770[7770] - Implement CheckConfig in RunnerFactory to make autodiscover check configs {pull}7961[7961] +- Count HTTP 429 responses in the elasticsearch output {pull}8056[8056] *Auditbeat* diff --git a/libbeat/outputs/elasticsearch/client.go b/libbeat/outputs/elasticsearch/client.go index 2bd1c06ab37..308767b1eaa 100644 --- a/libbeat/outputs/elasticsearch/client.go +++ b/libbeat/outputs/elasticsearch/client.go @@ -112,6 +112,7 @@ type bulkResultStats struct { duplicates int // number of events failed with `create` due to ID already being indexed fails int // number of failed events (can be retried) nonIndexable int // number of failed events (not indexable -> must be dropped) + tooMany int // number of events receiving HTTP 429 Too Many Requests } var ( @@ -343,6 +344,7 @@ func (client *Client) publishEvents( st.Failed(failed) st.Dropped(dropped) st.Duplicate(duplicates) + st.ErrTooMany(stats.tooMany) } if failed > 0 { @@ -515,11 +517,15 @@ func bulkCollectPublishFails( continue // ok } - if status < 500 && status != 429 { - // hard failure, don't collect - logp.Warn("Cannot index event %#v (status=%v): %s", data[i], status, msg) - stats.nonIndexable++ - continue + if status < 500 { + if status == http.StatusTooManyRequests { + stats.tooMany++ + } else { + // hard failure, don't collect + logp.Warn("Cannot index event %#v (status=%v): %s", data[i], status, msg) + stats.nonIndexable++ + continue + } } debugf("Bulk item insert failed (i=%v, status=%v): %s", i, status, msg) diff --git a/libbeat/outputs/elasticsearch/client_test.go b/libbeat/outputs/elasticsearch/client_test.go index 0c92d17b100..dd1d5104a00 100644 --- a/libbeat/outputs/elasticsearch/client_test.go +++ b/libbeat/outputs/elasticsearch/client_test.go @@ -119,11 +119,12 @@ func TestCollectPublishFailMiddle(t *testing.T) { events := []publisher.Event{event, eventFail, event} reader := newJSONReader(response) - res, _ := bulkCollectPublishFails(reader, events) + res, stats := bulkCollectPublishFails(reader, events) assert.Equal(t, 1, len(res)) if len(res) == 1 { assert.Equal(t, eventFail, res[0]) } + assert.Equal(t, stats, bulkResultStats{acked: 2, fails: 1, tooMany: 1}) } func TestCollectPublishFailAll(t *testing.T) { @@ -139,9 +140,10 @@ func TestCollectPublishFailAll(t *testing.T) { events := []publisher.Event{event, event, event} reader := newJSONReader(response) - res, _ := bulkCollectPublishFails(reader, events) + res, stats := bulkCollectPublishFails(reader, events) assert.Equal(t, 3, len(res)) assert.Equal(t, events, res) + assert.Equal(t, stats, bulkResultStats{fails: 3, tooMany: 3}) } func TestCollectPipelinePublishFail(t *testing.T) { diff --git a/libbeat/outputs/metrics.go b/libbeat/outputs/metrics.go index 43dbf5da459..28ac20627da 100644 --- a/libbeat/outputs/metrics.go +++ b/libbeat/outputs/metrics.go @@ -33,6 +33,7 @@ type Stats struct { active *monitoring.Uint // events sent and waiting for ACK/fail from output duplicates *monitoring.Uint // events sent and waiting for ACK/fail from output dropped *monitoring.Uint // total number of invalid events dropped by the output + tooMany *monitoring.Uint // total number of too many requests replies from output // // Output network connection stats @@ -56,6 +57,7 @@ func NewStats(reg *monitoring.Registry) *Stats { dropped: monitoring.NewUint(reg, "events.dropped"), duplicates: monitoring.NewUint(reg, "events.duplicates"), active: monitoring.NewUint(reg, "events.active"), + tooMany: monitoring.NewUint(reg, "events.toomany"), writeBytes: monitoring.NewUint(reg, "write.bytes"), writeErrors: monitoring.NewUint(reg, "write.errors"), @@ -117,6 +119,13 @@ func (s *Stats) Cancelled(n int) { } } +// ErrTooMany updates the number of Too Many Requests responses reported by the output. +func (s *Stats) ErrTooMany(n int) { + if s != nil { + s.tooMany.Add(uint64(n)) + } +} + // WriteError increases the write I/O error metrics. func (s *Stats) WriteError(err error) { if s != nil { diff --git a/libbeat/outputs/observer.go b/libbeat/outputs/observer.go index 2cc4995399c..ed14920182d 100644 --- a/libbeat/outputs/observer.go +++ b/libbeat/outputs/observer.go @@ -30,6 +30,7 @@ type Observer interface { WriteBytes(int) // report number of bytes being written ReadError(error) // report an I/O error on read ReadBytes(int) // report number of bytes being read + ErrTooMany(int) // report too many requests response } type emptyObserver struct{} @@ -51,3 +52,4 @@ func (*emptyObserver) WriteError(error) {} func (*emptyObserver) WriteBytes(int) {} func (*emptyObserver) ReadError(error) {} func (*emptyObserver) ReadBytes(int) {} +func (*emptyObserver) ErrTooMany(int) {}