From 218e16c7665a29d2af808358efe942cce2a242da Mon Sep 17 00:00:00 2001 From: Steffen Siering Date: Thu, 4 Jun 2020 15:03:45 +0200 Subject: [PATCH] Cherry-pick #18738 to 7.x: Improve thread safety of fingerprint processor (#18926) --- CHANGELOG.next.asciidoc | 1 + libbeat/processors/fingerprint/fingerprint.go | 15 +++--- .../fingerprint/fingerprint_test.go | 50 ++++++++++++++++++- 3 files changed, 58 insertions(+), 8 deletions(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index f8ab731739e..34295f209ec 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -120,6 +120,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - [Autodiscover] Check if runner is already running before starting again. {pull}18564[18564] - Fix `keystore add` hanging under Windows. {issue}18649[18649] {pull}18654[18654] - Fix regression in `add_kubernetes_metadata`, so configured `indexers` and `matchers` are used if defaults are not disabled. {issue}18481[18481] {pull}18818[18818] +- Fix potential race condition in fingerprint processor. {pull}18738[18738] *Auditbeat* diff --git a/libbeat/processors/fingerprint/fingerprint.go b/libbeat/processors/fingerprint/fingerprint.go index 90d051cec77..3028f7a20fe 100644 --- a/libbeat/processors/fingerprint/fingerprint.go +++ b/libbeat/processors/fingerprint/fingerprint.go @@ -19,7 +19,6 @@ package fingerprint import ( "fmt" - "hash" "io" "time" @@ -39,7 +38,7 @@ const processorName = "fingerprint" type fingerprint struct { config Config fields []string - hash hash.Hash + hash hashMethod } // New constructs a new fingerprint processor. @@ -49,12 +48,15 @@ func New(cfg *common.Config) (processors.Processor, error) { return nil, makeErrConfigUnpack(err) } - fields := common.MakeStringSet(config.Fields...) + // The fields array must be sorted, to guarantee that we always + // get the same hash for a similar set of configured keys. + // The call `ToSlice` always returns a sorted slice. + fields := common.MakeStringSet(config.Fields...).ToSlice() p := &fingerprint{ config: config, - hash: config.Method(), - fields: fields.ToSlice(), + hash: config.Method, + fields: fields, } return p, nil @@ -62,8 +64,7 @@ func New(cfg *common.Config) (processors.Processor, error) { // Run enriches the given event with fingerprint information func (p *fingerprint) Run(event *beat.Event) (*beat.Event, error) { - hashFn := p.hash - hashFn.Reset() + hashFn := p.hash() err := p.writeFields(hashFn, event.Fields) if err != nil { diff --git a/libbeat/processors/fingerprint/fingerprint_test.go b/libbeat/processors/fingerprint/fingerprint_test.go index 6274af7c0b2..957ab589876 100644 --- a/libbeat/processors/fingerprint/fingerprint_test.go +++ b/libbeat/processors/fingerprint/fingerprint_test.go @@ -24,11 +24,56 @@ import ( "time" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/common" ) +func TestWithConfig(t *testing.T) { + cases := map[string]struct { + config common.MapStr + input common.MapStr + want string + }{ + "hello world": { + config: common.MapStr{ + "fields": []string{"message"}, + }, + input: common.MapStr{ + "message": "hello world", + }, + want: "50110bbfc1757f21caacc966b33f5ea2235c4176739447e0b3285dec4e1dd2a4", + }, + "with string escaping": { + config: common.MapStr{ + "fields": []string{"message"}, + }, + input: common.MapStr{ + "message": `test message "hello world"`, + }, + want: "14a0364b79acbe4c78dd5e77db2c93ae8c750518b32581927d50b3eef407184e", + }, + } + + for name, test := range cases { + t.Run(name, func(t *testing.T) { + config := common.MustNewConfigFrom(test.config) + p, err := New(config) + require.NoError(t, err) + + testEvent := &beat.Event{ + Timestamp: time.Now(), + Fields: test.input.Clone(), + } + newEvent, err := p.Run(testEvent) + v, err := newEvent.GetValue("fingerprint") + assert.NoError(t, err) + assert.Equal(t, test.want, v) + }) + } +} + func TestHashMethods(t *testing.T) { testFields := common.MapStr{ "field1": "foo", @@ -393,7 +438,10 @@ func BenchmarkHashMethods(b *testing.B) { b.Run(method, func(b *testing.B) { b.ResetTimer() for _, e := range events { - p.Run(&e) + _, err := p.Run(&e) + if err != nil { + b.Fatal(err) + } } }) }